Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class SummaryProfile {
public static final String NEREIDS_LOCK_TABLE_TIME = "Nereids Lock Table Time";
public static final String NEREIDS_ANALYSIS_TIME = "Nereids Analysis Time";
public static final String NEREIDS_REWRITE_TIME = "Nereids Rewrite Time";

public static final String NEREIDS_COLLECT_TABLE_PARTITION_TIME = "Nereids Collect Table Partition Time";
public static final String NEREIDS_OPTIMIZE_TIME = "Nereids Optimize Time";
public static final String NEREIDS_TRANSLATE_TIME = "Nereids Translate Time";
public static final String NEREIDS_GARBAGE_COLLECT_TIME = "Nereids GarbageCollect Time";
Expand Down Expand Up @@ -198,6 +200,8 @@ public class SummaryProfile {
private long parseSqlFinishTime = -1;

private long nereidsLockTableFinishTime = -1;

private long nereidsCollectTablePartitionFinishTime = -1;
private long nereidsAnalysisFinishTime = -1;
private long nereidsRewriteFinishTime = -1;
private long nereidsOptimizeFinishTime = -1;
Expand Down Expand Up @@ -318,6 +322,8 @@ private void updateExecutionSummaryProfile() {
executionSummaryProfile.addInfoString(NEREIDS_LOCK_TABLE_TIME, getPrettyNereidsLockTableTime());
executionSummaryProfile.addInfoString(NEREIDS_ANALYSIS_TIME, getPrettyNereidsAnalysisTime());
executionSummaryProfile.addInfoString(NEREIDS_REWRITE_TIME, getPrettyNereidsRewriteTime());
executionSummaryProfile.addInfoString(NEREIDS_COLLECT_TABLE_PARTITION_TIME,
getPrettyNereidsCollectTablePartitionTime());
executionSummaryProfile.addInfoString(NEREIDS_OPTIMIZE_TIME, getPrettyNereidsOptimizeTime());
executionSummaryProfile.addInfoString(NEREIDS_TRANSLATE_TIME, getPrettyNereidsTranslateTime());
executionSummaryProfile.addInfoString(NEREIDS_GARBAGE_COLLECT_TIME, getPrettyNereidsGarbageCollectionTime());
Expand Down Expand Up @@ -406,6 +412,10 @@ public void setNereidsLockTableFinishTime() {
this.nereidsLockTableFinishTime = TimeUtils.getStartTimeMs();
}

public void setNereidsCollectTablePartitionFinishTime() {
this.nereidsCollectTablePartitionFinishTime = TimeUtils.getStartTimeMs();
}

public void setNereidsAnalysisTime() {
this.nereidsAnalysisFinishTime = TimeUtils.getStartTimeMs();
}
Expand Down Expand Up @@ -663,8 +673,13 @@ public String getPrettyNereidsRewriteTime() {
return getPrettyTime(nereidsRewriteFinishTime, nereidsAnalysisFinishTime, TUnit.TIME_MS);
}


public String getPrettyNereidsCollectTablePartitionTime() {
return getPrettyTime(nereidsCollectTablePartitionFinishTime, nereidsRewriteFinishTime, TUnit.TIME_MS);
}

public String getPrettyNereidsOptimizeTime() {
return getPrettyTime(nereidsOptimizeFinishTime, nereidsRewriteFinishTime, TUnit.TIME_MS);
return getPrettyTime(nereidsOptimizeFinishTime, nereidsCollectTablePartitionFinishTime, TUnit.TIME_MS);
}

public String getPrettyNereidsTranslateTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
Expand All @@ -27,6 +28,7 @@
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
import org.apache.doris.nereids.rules.exploration.mv.PartitionCompensator;
import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
Expand All @@ -43,6 +45,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -82,13 +85,21 @@ public Set<MTMV> getAvailableMTMVs(List<BaseTableInfo> tableInfos, ConnectContex
boolean forceConsistent, BiPredicate<ConnectContext, MTMV> predicate) {
Set<MTMV> res = Sets.newLinkedHashSet();
Set<BaseTableInfo> mvInfos = getMTMVInfos(tableInfos);
Map<List<String>, Set<String>> queryUsedPartitions = PartitionCompensator.getQueryUsedPartitions(
ctx.getStatementContext());

for (BaseTableInfo tableInfo : mvInfos) {
try {
MTMV mtmv = (MTMV) MTMVUtil.getTable(tableInfo);
if (predicate.test(ctx, mtmv)) {
continue;
}
if (isMVPartitionValid(mtmv, ctx, forceConsistent)) {
if (!mtmv.isUseForRewrite()) {
continue;
}
BaseTableInfo relatedTableInfo = mtmv.getMvPartitionInfo().getRelatedTableInfo();
if (isMVPartitionValid(mtmv, ctx, forceConsistent,
relatedTableInfo == null ? null : queryUsedPartitions.get(relatedTableInfo.toList()))) {
res.add(mtmv);
}
} catch (Exception e) {
Expand Down Expand Up @@ -117,10 +128,15 @@ public Set<MTMV> getAllMTMVs(List<BaseTableInfo> tableInfos) {
}

@VisibleForTesting
public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx, boolean forceConsistent) {
public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx, boolean forceConsistent,
Set<String> relatedPartitions) {
long currentTimeMillis = System.currentTimeMillis();
return !CollectionUtils
.isEmpty(MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMillis, forceConsistent));
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil.getMTMVCanRewritePartitions(
mtmv, ctx, currentTimeMillis, forceConsistent, relatedPartitions);
// MTMVRewriteUtil.getMTMVCanRewritePartitions is time-consuming behavior, So record for used later
ctx.getStatementContext().getMvCanRewritePartitionsMap().putIfAbsent(
new BaseTableInfo(mtmv), mtmvCanRewritePartitions);
return !CollectionUtils.isEmpty(mtmvCanRewritePartitions);
}

private Set<BaseTableInfo> getMTMVInfos(List<BaseTableInfo> tableInfos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

public class MTMVRewriteUtil {
private static final Logger LOG = LogManager.getLogger(MTMVRewriteUtil.class);
Expand All @@ -43,7 +47,8 @@ public class MTMVRewriteUtil {
* @return
*/
public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, ConnectContext ctx,
long currentTimeMills, boolean forceConsistent) {
long currentTimeMills, boolean forceConsistent,
Set<String> relatedPartitions) {
List<Partition> res = Lists.newArrayList();
Collection<Partition> allPartitions = mtmv.getPartitions();
MTMVRelation mtmvRelation = mtmv.getRelation();
Expand All @@ -55,6 +60,11 @@ public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, Conne
if (mtmvStatus.getState() != MTMVState.NORMAL || mtmvStatus.getRefreshState() == MTMVRefreshState.INIT) {
return res;
}
// if relatedPartitions is empty but not null, which means query no partitions
if (relatedPartitions != null && relatedPartitions.size() == 0) {
return res;
}
Set<String> mtmvNeedComparePartitions = null;
MTMVRefreshContext refreshContext = null;
// check gracePeriod
long gracePeriodMills = mtmv.getGracePeriod();
Expand All @@ -73,6 +83,14 @@ public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, Conne
return res;
}
}
if (mtmvNeedComparePartitions == null) {
mtmvNeedComparePartitions = getMtmvPartitionsByRelatedPartitions(mtmv, refreshContext,
relatedPartitions);
}
// if the partition which query not used, should not compare partition version
if (!mtmvNeedComparePartitions.contains(partition.getName())) {
continue;
}
try {
if (MTMVPartitionUtil.isMTMVPartitionSync(refreshContext, partition.getName(),
mtmvRelation.getBaseTablesOneLevel(),
Expand All @@ -86,4 +104,29 @@ public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, Conne
}
return res;
}

private static Set<String> getMtmvPartitionsByRelatedPartitions(MTMV mtmv, MTMVRefreshContext refreshContext,
Set<String> relatedPartitions) {
// if relatedPartitions is null, which means QueryPartitionCollector visitLogicalCatalogRelation can not
// get query used partitions, should get all mtmv partitions
if (relatedPartitions == null) {
return mtmv.getPartitionNames();
}
Set<String> res = Sets.newHashSet();
Map<String, String> relatedToMv = getRelatedToMv(refreshContext.getPartitionMappings());
for (String relatedPartition : relatedPartitions) {
res.add(relatedToMv.get(relatedPartition));
}
return res;
}

private static Map<String, String> getRelatedToMv(Map<String, Set<String>> mvToRelated) {
Map<String, String> res = Maps.newHashMap();
for (Entry<String, Set<String>> entry : mvToRelated.entrySet()) {
for (String relatedPartition : entry.getValue()) {
res.put(relatedPartition, entry.getKey());
}
}
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.nereids.jobs.JobContext;
import org.apache.doris.nereids.jobs.executor.Analyzer;
import org.apache.doris.nereids.jobs.executor.TableCollectAndHookInitializer;
import org.apache.doris.nereids.jobs.executor.TablePartitionCollector;
import org.apache.doris.nereids.jobs.rewrite.RewriteBottomUpJob;
import org.apache.doris.nereids.jobs.rewrite.RewriteTopDownJob;
import org.apache.doris.nereids.jobs.rewrite.RootPlanTreeRewriteJob.RootRewriteJobContext;
Expand Down Expand Up @@ -228,6 +229,10 @@ public TableCollectAndHookInitializer newTableCollector() {
return new TableCollectAndHookInitializer(this);
}

public TablePartitionCollector newTablePartitionCollector() {
return new TablePartitionCollector(this);
}

public Analyzer newAnalyzer() {
return new Analyzer(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,22 @@ protected void collectAndLockTable(boolean showPlanProcess) {
}
}

private void analyze(boolean showPlanProcess) {
protected void collectTableUsedPartitions(boolean showPlanProcess) {
if (LOG.isDebugEnabled()) {
LOG.debug("Start to collect table used partition");
}
keepOrShowPlanProcess(showPlanProcess, () -> cascadesContext.newTablePartitionCollector().execute());
NereidsTracer.logImportantTime("EndCollectTablePartitions");
if (LOG.isDebugEnabled()) {
LOG.debug("Start to collect table used partition");
}
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile()
.setNereidsCollectTablePartitionFinishTime();
}
}

protected void analyze(boolean showPlanProcess) {
if (LOG.isDebugEnabled()) {
LOG.debug("Start analyze plan");
}
Expand Down Expand Up @@ -376,6 +391,10 @@ private void rewrite(boolean showPlanProcess) {
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsRewriteTime();
}
// collect partitions table used, this is for query rewrite by materialized view
// this is needed before init hook
collectTableUsedPartitions(showPlanProcess);
cascadesContext.getStatementContext().getPlannerHooks().forEach(hook -> hook.afterRewrite(this));
}

// DependsRules: EnsureProjectOnTopJoin.class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,16 @@ default void beforeAnalyze(NereidsPlanner planner) {
*/
default void afterAnalyze(NereidsPlanner planner) {
}

/**
* the hook before rewrite
*/
default void beforeRewrite(NereidsPlanner planner) {
}

/**
* the hook after rewrite
*/
default void afterRewrite(NereidsPlanner planner) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.catalog.constraint.TableIdentifier;
Expand All @@ -29,6 +30,7 @@
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.datasource.mvcc.MvccTable;
import org.apache.doris.datasource.mvcc.MvccTableInfo;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.hint.Hint;
import org.apache.doris.nereids.memo.Group;
Expand Down Expand Up @@ -60,6 +62,7 @@
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
Expand Down Expand Up @@ -106,6 +109,7 @@ public enum TableFrom {
private ConnectContext connectContext;

private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final Stopwatch materializedViewStopwatch = Stopwatch.createUnstarted();

@GuardedBy("this")
private final Map<String, Supplier<Object>> contextCacheMap = Maps.newLinkedHashMap();
Expand Down Expand Up @@ -175,7 +179,14 @@ public enum TableFrom {

// tables in this query directly
private final Map<List<String>, TableIf> tables = Maps.newHashMap();
// tables maybe used by mtmv rewritten in this query
// tables maybe used by mtmv rewritten in this query,
// this contains mvs which use table in tables and the tables in mvs
// such as
// mv1 use t1, t2.
// mv2 use mv1, t3, t4.
// mv3 use t3, t4, t5
// if query is: select * from t2 join t5
// mtmvRelatedTables is mv1, mv2, mv3, t1, t2, t3, t4, t5
private final Map<List<String>, TableIf> mtmvRelatedTables = Maps.newHashMap();
// insert into target tables
private final Map<List<String>, TableIf> insertTargetTables = Maps.newHashMap();
Expand Down Expand Up @@ -211,6 +222,16 @@ public enum TableFrom {

private boolean privChecked;

// if greater than 0 means the duration has used
private long materializedViewRewriteDuration = 0L;

// Record used table and it's used partitions
private final Multimap<List<String>, Pair<RelationId, Set<String>>> tableUsedPartitionNameMap =
HashMultimap.create();

// Record mtmv and valid partitions map because this is time-consuming behavior
private final Map<BaseTableInfo, Collection<Partition>> mvCanRewritePartitionsMap = new HashMap<>();

public StatementContext() {
this(ConnectContext.get(), null, 0);
}
Expand Down Expand Up @@ -339,6 +360,26 @@ public Stopwatch getStopwatch() {
return stopwatch;
}

public Stopwatch getMaterializedViewStopwatch() {
return materializedViewStopwatch;
}

public long getMaterializedViewRewriteDuration() {
return materializedViewRewriteDuration;
}

public void addMaterializedViewRewriteDuration(long millisecond) {
materializedViewRewriteDuration += millisecond;
}

public Multimap<List<String>, Pair<RelationId, Set<String>>> getTableUsedPartitionNameMap() {
return tableUsedPartitionNameMap;
}

public Map<BaseTableInfo, Collection<Partition>> getMvCanRewritePartitionsMap() {
return mvCanRewritePartitionsMap;
}

public void setMaxNAryInnerJoin(int maxNAryInnerJoin) {
if (maxNAryInnerJoin > this.maxNAryInnerJoin) {
this.maxNAryInnerJoin = maxNAryInnerJoin;
Expand Down
Loading