Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,20 @@
import org.apache.doris.nereids.trees.copier.DeepCopierContext;
import org.apache.doris.nereids.trees.copier.LogicalPlanDeepCopier;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.CTEId;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -39,7 +43,12 @@
import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* pull up LogicalCteAnchor to the top of plan to avoid CteAnchor break other rewrite rules pattern
Expand All @@ -49,6 +58,8 @@
*/
public class CTEInline extends DefaultPlanRewriter<LogicalCTEProducer<?>> implements CustomRewriter {

private final Map<CTEId, List<Plan>> cteIdPlanMap = new HashMap<>();

@Override
public Plan rewriteRoot(Plan plan, JobContext jobContext) {
Plan root = plan.accept(this, null);
Expand All @@ -72,13 +83,49 @@ public Plan visitLogicalCTEAnchor(LogicalCTEAnchor<? extends Plan, ? extends Pla
return cteAnchor.withChildren(children);
} else {
// process this anchor
final Plan[] currentJoinPlan = {null};
List<LogicalCTEConsumer> consumers = cteAnchor.child(1).collectToList(p -> {
if (p instanceof LogicalCTEConsumer) {
if (((LogicalCTEConsumer) p).getCteId().equals(cteAnchor.getCteId())) {
if (currentJoinPlan[0] != null) {
cteIdPlanMap.computeIfAbsent(((LogicalCTEConsumer) p).getCteId(),
x -> new ArrayList<>()).add(currentJoinPlan[0]);
}
return true;
}
return ((LogicalCTEConsumer) p).getCteId().equals(cteAnchor.getCteId());
}
if (p instanceof LogicalJoin
&& (!(ConnectContext.get().getSessionVariable().enableJoinSameCteChild))) {
currentJoinPlan[0] = (Plan) p;
}
if ((p instanceof LogicalUnion || p instanceof LogicalAggregate)
&& (!(ConnectContext.get().getSessionVariable().enableJoinSameCteChild))) {
currentJoinPlan[0] = (Plan) null;
}
return false;
});
ConnectContext connectContext = ConnectContext.get();
if (!connectContext.getSessionVariable().enableJoinSameCteChild) {
for (LogicalCTEConsumer consumer : consumers) {
CTEId consumerCteId = consumer.getCteId();
List<Plan> plans = (cteIdPlanMap != null) ? cteIdPlanMap.get(consumerCteId) : null;

if (plans != null) {
Set<Plan> uniqueItems = new HashSet<>();
List<Plan> duplicates = plans.stream()
.filter(plan -> !uniqueItems.add(plan))
.collect(Collectors.toList());

if (!duplicates.isEmpty()) {
// should inline
Plan root = cteAnchor.right().accept(this, (LogicalCTEProducer<?>) cteAnchor.left());
//process child
return root.accept(this, null);
}
}
}
}
if (connectContext.getSessionVariable().enableCTEMaterialize
&& consumers.size() > connectContext.getSessionVariable().inlineCTEReferencedThreshold) {
// not inline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ public static double getHotValueThreshold() {

public static final String ENABLE_PREFER_CACHED_ROWSET = "enable_prefer_cached_rowset";
public static final String QUERY_FRESHNESS_TOLERANCE_MS = "query_freshness_tolerance_ms";
public static final String ENABLE_JOIN_SAME_CTE_CHILD = "enable_join_same_cte_child";

static {
affectQueryResultFields = Arrays.stream(SessionVariable.class.getDeclaredFields())
Expand Down Expand Up @@ -2698,6 +2699,9 @@ public void setDetailShapePlanNodes(String detailShapePlanNodes) {
"用于控制结果反序列化时 thrift 字段的最大值,当遇到类似\"MaxMessageSize reached\"这样的错误时可以考虑修改该参数"})
public int maxMsgSizeOfResultReceiver = TConfiguration.DEFAULT_MAX_MESSAGE_SIZE;

@VariableMgr.VarAttr(name = ENABLE_JOIN_SAME_CTE_CHILD, description = {"Enable join in plan for same CTE child",
"Enable join in plan for same CTE child"})
public boolean enableJoinSameCteChild = true;

// CLOUD_VARIABLES_BEGIN
@VariableMgr.VarAttr(name = CLOUD_CLUSTER, alias = {COMPUTE_GROUP})
Expand Down