Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TUnit;
Expand Down Expand Up @@ -240,6 +242,8 @@ public class SummaryProfile {

@SerializedName(value = "nereidsCollectTablePartitionFinishTime")
private long nereidsCollectTablePartitionFinishTime = -1;
@SerializedName(value = "nereidsCollectTablePartitionTime")
private long nereidsCollectTablePartitionTime = 0;
@SerializedName(value = "nereidsAnalysisFinishTime")
private long nereidsAnalysisFinishTime = -1;
@SerializedName(value = "nereidsRewriteFinishTime")
Expand Down Expand Up @@ -553,6 +557,10 @@ public void setNereidsCollectTablePartitionFinishTime() {
this.nereidsCollectTablePartitionFinishTime = TimeUtils.getStartTimeMs();
}

public void addCollectTablePartitionTime(long elapsed) {
nereidsCollectTablePartitionTime += elapsed;
}

public void setNereidsAnalysisTime() {
this.nereidsAnalysisFinishTime = TimeUtils.getStartTimeMs();
}
Expand Down Expand Up @@ -829,7 +837,9 @@ public String getPrettyNereidsRewriteTime() {


public String getPrettyNereidsCollectTablePartitionTime() {
return getPrettyTime(nereidsCollectTablePartitionFinishTime, nereidsRewriteFinishTime, TUnit.TIME_MS);
long totalTime = nereidsCollectTablePartitionFinishTime
- nereidsRewriteFinishTime + nereidsCollectTablePartitionTime;
return RuntimeProfile.printCounter(totalTime, TUnit.TIME_MS);
}

public String getPrettyNereidsOptimizeTime() {
Expand Down Expand Up @@ -999,4 +1009,15 @@ public void write(DataOutput output) throws IOException {
public void setAssignedWeightPerBackend(Map<Backend, Long> assignedWeightPerBackend) {
this.assignedWeightPerBackend = assignedWeightPerBackend;
}

public static SummaryProfile getSummaryProfile(ConnectContext connectContext) {
ConnectContext ctx = connectContext == null ? ConnectContext.get() : connectContext;
if (ctx != null) {
StmtExecutor executor = ctx.getExecutor();
if (executor != null) {
return executor.getSummaryProfile();
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.BitSet;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -85,7 +86,7 @@ public Set<MTMV> getAvailableMTMVs(Set<MTMV> candidateMTMVs, ConnectContext ctx,
boolean forceConsistent, BiPredicate<ConnectContext, MTMV> predicate) {
Set<MTMV> res = Sets.newLinkedHashSet();
Map<List<String>, Set<String>> queryUsedPartitions = PartitionCompensator.getQueryUsedPartitions(
ctx.getStatementContext());
ctx.getStatementContext(), new BitSet());
for (MTMV mtmv : candidateMTMVs) {
if (predicate.test(ctx, mtmv)) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.doris.nereids.jobs.JobContext;
import org.apache.doris.nereids.jobs.executor.Analyzer;
import org.apache.doris.nereids.jobs.executor.TableCollectAndHookInitializer;
import org.apache.doris.nereids.jobs.executor.TablePartitionCollector;
import org.apache.doris.nereids.jobs.rewrite.RewriteBottomUpJob;
import org.apache.doris.nereids.jobs.rewrite.RewriteTopDownJob;
import org.apache.doris.nereids.jobs.rewrite.RootPlanTreeRewriteJob.RootRewriteJobContext;
Expand Down Expand Up @@ -229,10 +228,6 @@ public TableCollectAndHookInitializer newTableCollector() {
return new TableCollectAndHookInitializer(this);
}

public TablePartitionCollector newTablePartitionCollector() {
return new TablePartitionCollector(this);
}

public Analyzer newAnalyzer() {
return new Analyzer(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,21 +369,6 @@ protected void collectAndLockTable(boolean showPlanProcess) {
}
}

protected void collectTableUsedPartitions(boolean showPlanProcess) {
if (LOG.isDebugEnabled()) {
LOG.debug("Start to collect table used partition");
}
keepOrShowPlanProcess(showPlanProcess, () -> cascadesContext.newTablePartitionCollector().execute());
NereidsTracer.logImportantTime("EndCollectTablePartitions");
if (LOG.isDebugEnabled()) {
LOG.debug("Start to collect table used partition");
}
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile()
.setNereidsCollectTablePartitionFinishTime();
}
}

protected void analyze(boolean showPlanProcess) {
if (LOG.isDebugEnabled()) {
LOG.debug("Start analyze plan");
Expand Down Expand Up @@ -416,9 +401,6 @@ protected void rewrite(boolean showPlanProcess) {
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsRewriteTime();
}
// collect partitions table used, this is for query rewrite by materialized view
// this is needed before init hook
collectTableUsedPartitions(showPlanProcess);
cascadesContext.getStatementContext().getPlannerHooks().forEach(hook -> hook.afterRewrite(this));
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Id;
import org.apache.doris.common.Pair;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
Expand Down Expand Up @@ -298,7 +300,7 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv();
BaseTableInfo relatedTableInfo = mtmv.getMvPartitionInfo().getRelatedTableInfo();
Map<List<String>, Set<String>> queryUsedPartitions = PartitionCompensator.getQueryUsedPartitions(
cascadesContext.getConnectContext().getStatementContext());
cascadesContext.getStatementContext(), queryStructInfo.getRelationIdBitSet());
Set<String> relateTableUsedPartitions = queryUsedPartitions.get(relatedTableInfo.toList());
if (relateTableUsedPartitions == null) {
materializationContext.recordFailReason(queryStructInfo,
Expand Down Expand Up @@ -420,6 +422,18 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
logicalProperties, queryPlan.getLogicalProperties()));
continue;
}
// need to collect table partition again, because the rewritten plan would contain new relation
// and the rewritten plan would part in rewritten later , the table used partition info is needed
// for later rewrite
long startTimeMs = TimeUtils.getStartTimeMs();
try {
MaterializedViewUtils.collectTableUsedPartitions(rewrittenPlan, cascadesContext);
} finally {
SummaryProfile summaryProfile = SummaryProfile.getSummaryProfile(cascadesContext.getConnectContext());
if (summaryProfile != null) {
summaryProfile.addCollectTablePartitionTime(TimeUtils.getElapsedTimeMs(startTimeMs));
}
}
trySetStatistics(materializationContext, cascadesContext);
rewriteResults.add(rewrittenPlan);
recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext, cascadesContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.PlannerHook;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.qe.ConnectContext;

Expand Down Expand Up @@ -60,6 +61,15 @@ public class InitMaterializationContextHook implements PlannerHook {

@Override
public void afterRewrite(NereidsPlanner planner) {
CascadesContext cascadesContext = planner.getCascadesContext();
// collect partitions table used, this is for query rewrite by materialized view
// this is needed before init hook, because compare partition version in init hook would use this
MaterializedViewUtils.collectTableUsedPartitions(cascadesContext.getRewritePlan(), cascadesContext);
StatementContext statementContext = cascadesContext.getStatementContext();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile()
.setNereidsCollectTablePartitionFinishTime();
}
initMaterializationContext(planner.getCascadesContext());
}

Expand Down Expand Up @@ -142,12 +152,17 @@ private List<MaterializationContext> createAsyncMaterializationContext(CascadesC
// so regenerate the struct info table bitset
StructInfo mvStructInfo = mtmvCache.getStructInfo();
BitSet tableBitSetInCurrentCascadesContext = new BitSet();
mvStructInfo.getRelations().forEach(relation -> tableBitSetInCurrentCascadesContext.set(
cascadesContext.getStatementContext().getTableId(relation.getTable()).asInt()));
BitSet relationIdBitSetInCurrentCascadesContext = new BitSet();
mvStructInfo.getRelations().forEach(relation -> {
tableBitSetInCurrentCascadesContext.set(
cascadesContext.getStatementContext().getTableId(relation.getTable()).asInt());
relationIdBitSetInCurrentCascadesContext.set(relation.getRelationId().asInt());
});
asyncMaterializationContext.add(new AsyncMaterializationContext(materializedView,
mtmvCache.getLogicalPlan(), mtmvCache.getOriginalPlan(), ImmutableList.of(),
ImmutableList.of(), cascadesContext,
mtmvCache.getStructInfo().withTableBitSet(tableBitSetInCurrentCascadesContext)));
mtmvCache.getStructInfo().withTableBitSet(tableBitSetInCurrentCascadesContext,
relationIdBitSetInCurrentCascadesContext)));
} catch (Exception e) {
LOG.warn(String.format("MaterializationContext init mv cache generate fail, current queryId is %s",
cascadesContext.getConnectContext().getQueryIdentifier()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.nereids.rules.analysis.BindRelation;
import org.apache.doris.nereids.rules.expression.ExpressionNormalization;
import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
import org.apache.doris.nereids.rules.rewrite.QueryPartitionCollector;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
Expand Down Expand Up @@ -300,6 +301,15 @@ public static List<Expression> extractNondeterministicFunction(Plan plan) {
return nondeterministicFunctions;
}

/**
* Collect table used partitions, this is used for mv rewrite partition union
* can not cumulative, if called multi times, should clean firstly
*/
public static void collectTableUsedPartitions(Plan plan, CascadesContext cascadesContext) {
// the recorded partition is based on relation id
plan.accept(new QueryPartitionCollector(), cascadesContext);
}

/**
* Check the query if Contains query operator
* Such sql as following should return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -164,8 +165,12 @@ public static boolean needUnionRewrite(MaterializationContext materializationCon
/**
* Get query used partitions
* this is calculated from tableUsedPartitionNameMap and tables in statementContext
* */
public static Map<List<String>, Set<String>> getQueryUsedPartitions(StatementContext statementContext) {
*
* @param customRelationIdSet if union compensate occurs, the new query used partitions is changed,
* so need to get used partitions by relation id set
*/
public static Map<List<String>, Set<String>> getQueryUsedPartitions(StatementContext statementContext,
BitSet customRelationIdSet) {
// get table used partitions
// if table is not in statementContext().getTables() which means the table is partition prune as empty relation
Multimap<List<String>, Pair<RelationId, Set<String>>> tableUsedPartitionNameMap = statementContext
Expand All @@ -174,7 +179,7 @@ public static Map<List<String>, Set<String>> getQueryUsedPartitions(StatementCon
// if value is null, means query all partitions
// if value is not empty, means query some partitions
Map<List<String>, Set<String>> queryUsedRelatedTablePartitionsMap = new HashMap<>();
outer:
tableLoop:
for (Map.Entry<List<String>, TableIf> queryUsedTableEntry : statementContext.getTables().entrySet()) {
Set<String> usedPartitionSet = new HashSet<>();
Collection<Pair<RelationId, Set<String>>> tableUsedPartitions =
Expand All @@ -185,11 +190,20 @@ public static Map<List<String>, Set<String>> getQueryUsedPartitions(StatementCon
continue;
}
for (Pair<RelationId, Set<String>> partitionPair : tableUsedPartitions) {
if (ALL_PARTITIONS.equals(partitionPair)) {
queryUsedRelatedTablePartitionsMap.put(queryUsedTableEntry.getKey(), null);
continue outer;
if (!customRelationIdSet.isEmpty()) {
if (ALL_PARTITIONS.equals(partitionPair)) {
continue;
}
if (customRelationIdSet.get(partitionPair.key().asInt())) {
usedPartitionSet.addAll(partitionPair.value());
}
} else {
if (ALL_PARTITIONS.equals(partitionPair)) {
queryUsedRelatedTablePartitionsMap.put(queryUsedTableEntry.getKey(), null);
continue tableLoop;
}
usedPartitionSet.addAll(partitionPair.value());
}
usedPartitionSet.addAll(partitionPair.value());
}
}
queryUsedRelatedTablePartitionsMap.put(queryUsedTableEntry.getKey(), usedPartitionSet);
Expand Down
Loading