Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public class SummaryProfile {
public static final String NEREIDS_LOCK_TABLE_TIME = "Nereids Lock Table Time";
public static final String NEREIDS_ANALYSIS_TIME = "Nereids Analysis Time";
public static final String NEREIDS_REWRITE_TIME = "Nereids Rewrite Time";

public static final String NEREIDS_COLLECT_TABLE_PARTITION_TIME = "Nereids Collect Table Partition Time";
public static final String NEREIDS_OPTIMIZE_TIME = "Nereids Optimize Time";
public static final String NEREIDS_TRANSLATE_TIME = "Nereids Translate Time";
public static final String NEREIDS_DISTRIBUTE_TIME = "Nereids Distribute Time";
Expand Down Expand Up @@ -236,6 +238,9 @@ public class SummaryProfile {
private long parseSqlFinishTime = -1;
@SerializedName(value = "nereidsLockTableFinishTime")
private long nereidsLockTableFinishTime = -1;

@SerializedName(value = "nereidsCollectTablePartitionFinishTime")
private long nereidsCollectTablePartitionFinishTime = -1;
@SerializedName(value = "nereidsAnalysisFinishTime")
private long nereidsAnalysisFinishTime = -1;
@SerializedName(value = "nereidsRewriteFinishTime")
Expand Down Expand Up @@ -445,6 +450,8 @@ private void updateExecutionSummaryProfile() {
executionSummaryProfile.addInfoString(NEREIDS_LOCK_TABLE_TIME, getPrettyNereidsLockTableTime());
executionSummaryProfile.addInfoString(NEREIDS_ANALYSIS_TIME, getPrettyNereidsAnalysisTime());
executionSummaryProfile.addInfoString(NEREIDS_REWRITE_TIME, getPrettyNereidsRewriteTime());
executionSummaryProfile.addInfoString(NEREIDS_COLLECT_TABLE_PARTITION_TIME,
getPrettyNereidsCollectTablePartitionTime());
executionSummaryProfile.addInfoString(NEREIDS_OPTIMIZE_TIME, getPrettyNereidsOptimizeTime());
executionSummaryProfile.addInfoString(NEREIDS_TRANSLATE_TIME, getPrettyNereidsTranslateTime());
executionSummaryProfile.addInfoString(NEREIDS_DISTRIBUTE_TIME, getPrettyNereidsDistributeTime());
Expand Down Expand Up @@ -543,6 +550,10 @@ public void setNereidsLockTableFinishTime() {
this.nereidsLockTableFinishTime = TimeUtils.getStartTimeMs();
}

public void setNereidsCollectTablePartitionFinishTime() {
this.nereidsCollectTablePartitionFinishTime = TimeUtils.getStartTimeMs();
}

public void setNereidsAnalysisTime() {
this.nereidsAnalysisFinishTime = TimeUtils.getStartTimeMs();
}
Expand Down Expand Up @@ -817,8 +828,13 @@ public String getPrettyNereidsRewriteTime() {
return getPrettyTime(nereidsRewriteFinishTime, nereidsAnalysisFinishTime, TUnit.TIME_MS);
}


public String getPrettyNereidsCollectTablePartitionTime() {
return getPrettyTime(nereidsCollectTablePartitionFinishTime, nereidsRewriteFinishTime, TUnit.TIME_MS);
}

public String getPrettyNereidsOptimizeTime() {
return getPrettyTime(nereidsOptimizeFinishTime, nereidsRewriteFinishTime, TUnit.TIME_MS);
return getPrettyTime(nereidsOptimizeFinishTime, nereidsCollectTablePartitionFinishTime, TUnit.TIME_MS);
}

public String getPrettyNereidsTranslateTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
Expand All @@ -27,6 +28,7 @@
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
import org.apache.doris.nereids.rules.exploration.mv.PartitionCompensator;
import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
Expand All @@ -43,6 +45,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -82,13 +85,21 @@ public Set<MTMV> getAvailableMTMVs(List<BaseTableInfo> tableInfos, ConnectContex
boolean forceConsistent, BiPredicate<ConnectContext, MTMV> predicate) {
Set<MTMV> res = Sets.newLinkedHashSet();
Set<BaseTableInfo> mvInfos = getMTMVInfos(tableInfos);
Map<List<String>, Set<String>> queryUsedPartitions = PartitionCompensator.getQueryUsedPartitions(
ctx.getStatementContext());

for (BaseTableInfo tableInfo : mvInfos) {
try {
MTMV mtmv = (MTMV) MTMVUtil.getTable(tableInfo);
if (predicate.test(ctx, mtmv)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add materializedView.isUseForRewrite

continue;
}
if (isMVPartitionValid(mtmv, ctx, forceConsistent)) {
if (!mtmv.isUseForRewrite()) {
continue;
}
BaseTableInfo relatedTableInfo = mtmv.getMvPartitionInfo().getRelatedTableInfo();
if (isMVPartitionValid(mtmv, ctx, forceConsistent,
relatedTableInfo == null ? null : queryUsedPartitions.get(relatedTableInfo.toList()))) {
res.add(mtmv);
}
} catch (Exception e) {
Expand Down Expand Up @@ -117,10 +128,15 @@ public Set<MTMV> getAllMTMVs(List<BaseTableInfo> tableInfos) {
}

@VisibleForTesting
public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx, boolean forceConsistent) {
public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx, boolean forceConsistent,
Set<String> relatedPartitions) {
long currentTimeMillis = System.currentTimeMillis();
return !CollectionUtils
.isEmpty(MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMillis, forceConsistent));
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil.getMTMVCanRewritePartitions(
mtmv, ctx, currentTimeMillis, forceConsistent, relatedPartitions);
// MTMVRewriteUtil.getMTMVCanRewritePartitions is time-consuming behavior, So record for used later
ctx.getStatementContext().getMvCanRewritePartitionsMap().putIfAbsent(
new BaseTableInfo(mtmv), mtmvCanRewritePartitions);
return !CollectionUtils.isEmpty(mtmvCanRewritePartitions);
}

private Set<BaseTableInfo> getMTMVInfos(List<BaseTableInfo> tableInfos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

public class MTMVRewriteUtil {
private static final Logger LOG = LogManager.getLogger(MTMVRewriteUtil.class);
Expand All @@ -43,7 +47,8 @@ public class MTMVRewriteUtil {
* @return
*/
public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, ConnectContext ctx,
long currentTimeMills, boolean forceConsistent) {
long currentTimeMills, boolean forceConsistent,
Set<String> relatedPartitions) {
List<Partition> res = Lists.newArrayList();
Collection<Partition> allPartitions = mtmv.getPartitions();
MTMVRelation mtmvRelation = mtmv.getRelation();
Expand All @@ -55,6 +60,11 @@ public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, Conne
if (mtmvStatus.getState() != MTMVState.NORMAL || mtmvStatus.getRefreshState() == MTMVRefreshState.INIT) {
return res;
}
// if relatedPartitions is empty but not null, which means query no partitions
if (relatedPartitions != null && relatedPartitions.size() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comments to explain that if it is null, it means that all partitions need to be calculated, and if size is 0, it means that no partition needs to participate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, add more comment

return res;
}
Set<String> mtmvNeedComparePartitions = null;
MTMVRefreshContext refreshContext = null;
// check gracePeriod
long gracePeriodMills = mtmv.getGracePeriod();
Expand All @@ -73,6 +83,14 @@ public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, Conne
return res;
}
}
if (mtmvNeedComparePartitions == null) {
mtmvNeedComparePartitions = getMtmvPartitionsByRelatedPartitions(mtmv, refreshContext,
relatedPartitions);
}
// if the partition which query not used, should not compare partition version
if (!mtmvNeedComparePartitions.contains(partition.getName())) {
continue;
}
try {
if (MTMVPartitionUtil.isMTMVPartitionSync(refreshContext, partition.getName(),
mtmvRelation.getBaseTablesOneLevel(),
Expand All @@ -86,4 +104,29 @@ public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, Conne
}
return res;
}

private static Set<String> getMtmvPartitionsByRelatedPartitions(MTMV mtmv, MTMVRefreshContext refreshContext,
Set<String> relatedPartitions) {
// if relatedPartitions is null, which means QueryPartitionCollector visitLogicalCatalogRelation can not
// get query used partitions, should get all mtmv partitions
if (relatedPartitions == null) {
return mtmv.getPartitionNames();
}
Set<String> res = Sets.newHashSet();
Map<String, String> relatedToMv = getRelatedToMv(refreshContext.getPartitionMappings());
for (String relatedPartition : relatedPartitions) {
res.add(relatedToMv.get(relatedPartition));
}
return res;
}

private static Map<String, String> getRelatedToMv(Map<String, Set<String>> mvToRelated) {
Map<String, String> res = Maps.newHashMap();
for (Entry<String, Set<String>> entry : mvToRelated.entrySet()) {
for (String relatedPartition : entry.getValue()) {
res.put(relatedPartition, entry.getKey());
}
}
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.nereids.jobs.JobContext;
import org.apache.doris.nereids.jobs.executor.Analyzer;
import org.apache.doris.nereids.jobs.executor.TableCollectAndHookInitializer;
import org.apache.doris.nereids.jobs.executor.TablePartitionCollector;
import org.apache.doris.nereids.jobs.rewrite.RewriteBottomUpJob;
import org.apache.doris.nereids.jobs.rewrite.RewriteTopDownJob;
import org.apache.doris.nereids.jobs.rewrite.RootPlanTreeRewriteJob.RootRewriteJobContext;
Expand Down Expand Up @@ -228,6 +229,10 @@ public TableCollectAndHookInitializer newTableCollector() {
return new TableCollectAndHookInitializer(this);
}

public TablePartitionCollector newTablePartitionCollector() {
return new TablePartitionCollector(this);
}

public Analyzer newAnalyzer() {
return new Analyzer(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,21 @@ protected void collectAndLockTable(boolean showPlanProcess) {
}
}

protected void collectTableUsedPartitions(boolean showPlanProcess) {
if (LOG.isDebugEnabled()) {
LOG.debug("Start to collect table used partition");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should log queryId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same to org.apache.doris.nereids.NereidsPlanner#analyze, i think should not add

}
keepOrShowPlanProcess(showPlanProcess, () -> cascadesContext.newTablePartitionCollector().execute());
NereidsTracer.logImportantTime("EndCollectTablePartitions");
if (LOG.isDebugEnabled()) {
LOG.debug("Start to collect table used partition");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same to org.apache.doris.nereids.NereidsPlanner#analyze, i think should not add

}
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile()
.setNereidsCollectTablePartitionFinishTime();
}
}

protected void analyze(boolean showPlanProcess) {
if (LOG.isDebugEnabled()) {
LOG.debug("Start analyze plan");
Expand Down Expand Up @@ -403,6 +418,10 @@ protected void rewrite(boolean showPlanProcess) {
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsRewriteTime();
}
// collect partitions table used, this is for query rewrite by materialized view
// this is needed before init hook
collectTableUsedPartitions(showPlanProcess);
cascadesContext.getStatementContext().getPlannerHooks().forEach(hook -> hook.afterRewrite(this));
}

// DependsRules: EnsureProjectOnTopJoin.class
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/nereids/PlannerHook.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,16 @@ default void beforeAnalyze(NereidsPlanner planner) {
*/
default void afterAnalyze(NereidsPlanner planner) {
}

/**
* the hook before rewrite
*/
default void beforeRewrite(NereidsPlanner planner) {
}

/**
* the hook after rewrite
*/
default void afterRewrite(NereidsPlanner planner) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.catalog.constraint.TableIdentifier;
Expand All @@ -30,6 +31,7 @@
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.datasource.mvcc.MvccTable;
import org.apache.doris.datasource.mvcc.MvccTableInfo;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.hint.Hint;
import org.apache.doris.nereids.hint.UseMvHint;
Expand Down Expand Up @@ -62,6 +64,7 @@
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
Expand Down Expand Up @@ -108,6 +111,7 @@ public enum TableFrom {
private ConnectContext connectContext;

private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final Stopwatch materializedViewStopwatch = Stopwatch.createUnstarted();

@GuardedBy("this")
private final Map<String, Supplier<Object>> contextCacheMap = Maps.newLinkedHashMap();
Expand Down Expand Up @@ -176,7 +180,14 @@ public enum TableFrom {

// tables in this query directly
private final Map<List<String>, TableIf> tables = Maps.newHashMap();
// tables maybe used by mtmv rewritten in this query
// tables maybe used by mtmv rewritten in this query,
// this contains mvs which use table in tables and the tables in mvs
// such as
// mv1 use t1, t2.
// mv2 use mv1, t3, t4.
// mv3 use t3, t4, t5
// if query is: select * from t2 join t5
// mtmvRelatedTables is mv1, mv2, mv3, t1, t2, t3, t4, t5
private final Map<List<String>, TableIf> mtmvRelatedTables = Maps.newHashMap();
// insert into target tables
private final Map<List<String>, TableIf> insertTargetTables = Maps.newHashMap();
Expand Down Expand Up @@ -216,6 +227,16 @@ public enum TableFrom {

private boolean privChecked;

// if greater than 0 means the duration has used
private long materializedViewRewriteDuration = 0L;

// Record used table and it's used partitions
private final Multimap<List<String>, Pair<RelationId, Set<String>>> tableUsedPartitionNameMap =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use List+RelationId as key

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the later, would get by List which is table qualifier, so the key is List

HashMultimap.create();

// Record mtmv and valid partitions map because this is time-consuming behavior
private final Map<BaseTableInfo, Collection<Partition>> mvCanRewritePartitionsMap = new HashMap<>();

/// for dictionary sink.
private List<Backend> usedBackendsDistributing; // report used backends after done distribute planning.
private long dictionaryUsedSrcVersion; // base table data version used in this refreshing.
Expand Down Expand Up @@ -361,6 +382,18 @@ public Stopwatch getStopwatch() {
return stopwatch;
}

public Stopwatch getMaterializedViewStopwatch() {
return materializedViewStopwatch;
}

public long getMaterializedViewRewriteDuration() {
return materializedViewRewriteDuration;
}

public void addMaterializedViewRewriteDuration(long millisecond) {
materializedViewRewriteDuration += millisecond;
}

public void setMaxNAryInnerJoin(int maxNAryInnerJoin) {
if (maxNAryInnerJoin > this.maxNAryInnerJoin) {
this.maxNAryInnerJoin = maxNAryInnerJoin;
Expand Down Expand Up @@ -816,6 +849,14 @@ public void setPartialLoadDictionary(boolean partialLoadDictionary) {
this.partialLoadDictionary = partialLoadDictionary;
}

public Multimap<List<String>, Pair<RelationId, Set<String>>> getTableUsedPartitionNameMap() {
return tableUsedPartitionNameMap;
}

public Map<BaseTableInfo, Collection<Partition>> getMvCanRewritePartitionsMap() {
return mvCanRewritePartitionsMap;
}

public void setPrepareStage(boolean isPrepare) {
this.prepareStage = isPrepare;
}
Expand Down
Loading