From ab21d4e2a3279fd815f71e122268d37a8cd46053 Mon Sep 17 00:00:00 2001 From: Sridhar R Manikarnike Date: Mon, 8 Sep 2025 05:43:52 +0000 Subject: [PATCH] Changes to break plan when encountering JOIN node children with same CTE ID --- .../nereids/rules/rewrite/CTEInline.java | 47 +++++++++++++++++++ .../org/apache/doris/qe/SessionVariable.java | 4 ++ 2 files changed, 51 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java index 22ec72c99c5559..d0c058ab62fd76 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java @@ -21,16 +21,20 @@ import org.apache.doris.nereids.trees.copier.DeepCopierContext; import org.apache.doris.nereids.trees.copier.LogicalPlanDeepCopier; import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.CTEId; import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor; import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer; import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer; +import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; import org.apache.doris.qe.ConnectContext; @@ -39,7 +43,12 @@ import com.google.common.collect.Lists; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; /** * pull up LogicalCteAnchor to the top of plan to avoid CteAnchor break other rewrite rules pattern @@ -49,6 +58,8 @@ */ public class CTEInline extends DefaultPlanRewriter> implements CustomRewriter { + private final Map> cteIdPlanMap = new HashMap<>(); + @Override public Plan rewriteRoot(Plan plan, JobContext jobContext) { Plan root = plan.accept(this, null); @@ -72,13 +83,49 @@ public Plan visitLogicalCTEAnchor(LogicalCTEAnchor consumers = cteAnchor.child(1).collectToList(p -> { if (p instanceof LogicalCTEConsumer) { + if (((LogicalCTEConsumer) p).getCteId().equals(cteAnchor.getCteId())) { + if (currentJoinPlan[0] != null) { + cteIdPlanMap.computeIfAbsent(((LogicalCTEConsumer) p).getCteId(), + x -> new ArrayList<>()).add(currentJoinPlan[0]); + } + return true; + } return ((LogicalCTEConsumer) p).getCteId().equals(cteAnchor.getCteId()); } + if (p instanceof LogicalJoin + && (!(ConnectContext.get().getSessionVariable().enableJoinSameCteChild))) { + currentJoinPlan[0] = (Plan) p; + } + if ((p instanceof LogicalUnion || p instanceof LogicalAggregate) + && (!(ConnectContext.get().getSessionVariable().enableJoinSameCteChild))) { + currentJoinPlan[0] = (Plan) null; + } return false; }); ConnectContext connectContext = ConnectContext.get(); + if (!connectContext.getSessionVariable().enableJoinSameCteChild) { + for (LogicalCTEConsumer consumer : consumers) { + CTEId consumerCteId = consumer.getCteId(); + List plans = (cteIdPlanMap != null) ? cteIdPlanMap.get(consumerCteId) : null; + + if (plans != null) { + Set uniqueItems = new HashSet<>(); + List duplicates = plans.stream() + .filter(plan -> !uniqueItems.add(plan)) + .collect(Collectors.toList()); + + if (!duplicates.isEmpty()) { + // should inline + Plan root = cteAnchor.right().accept(this, (LogicalCTEProducer) cteAnchor.left()); + //process child + return root.accept(this, null); + } + } + } + } if (connectContext.getSessionVariable().enableCTEMaterialize && consumers.size() > connectContext.getSessionVariable().inlineCTEReferencedThreshold) { // not inline diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 7712b4fc2de790..93ae0ba92adc29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -870,6 +870,7 @@ public static double getHotValueThreshold() { public static final String ENABLE_PREFER_CACHED_ROWSET = "enable_prefer_cached_rowset"; public static final String QUERY_FRESHNESS_TOLERANCE_MS = "query_freshness_tolerance_ms"; + public static final String ENABLE_JOIN_SAME_CTE_CHILD = "enable_join_same_cte_child"; static { affectQueryResultFields = Arrays.stream(SessionVariable.class.getDeclaredFields()) @@ -2698,6 +2699,9 @@ public void setDetailShapePlanNodes(String detailShapePlanNodes) { "用于控制结果反序列化时 thrift 字段的最大值,当遇到类似\"MaxMessageSize reached\"这样的错误时可以考虑修改该参数"}) public int maxMsgSizeOfResultReceiver = TConfiguration.DEFAULT_MAX_MESSAGE_SIZE; + @VariableMgr.VarAttr(name = ENABLE_JOIN_SAME_CTE_CHILD, description = {"Enable join in plan for same CTE child", + "Enable join in plan for same CTE child"}) + public boolean enableJoinSameCteChild = true; // CLOUD_VARIABLES_BEGIN @VariableMgr.VarAttr(name = CLOUD_CLUSTER, alias = {COMPUTE_GROUP})