diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 5ee52ca3b7e0a3..f86bbb02c317e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -1260,6 +1260,12 @@ public PlanFragment visitPhysicalFilter(PhysicalFilter filter, P MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink(); DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get( multiCastDataSink.getDataStreamSinks().size() - 1); + if (CollectionUtils.isNotEmpty(dataStreamSink.getConjuncts()) + || CollectionUtils.isNotEmpty(dataStreamSink.getProjections())) { + String errMsg = "generate invalid plan \n" + filter.treeString(); + LOG.warn(errMsg); + throw new AnalysisException(errMsg); + } filter.getConjuncts().stream() .map(e -> ExpressionTranslator.translate(e, context)) .forEach(dataStreamSink::addConjunct); @@ -1267,24 +1273,28 @@ public PlanFragment visitPhysicalFilter(PhysicalFilter filter, P } PlanNode planNode = inputFragment.getPlanRoot(); - Plan child = filter.child(); - while (child instanceof PhysicalLimit) { - child = ((PhysicalLimit) child).child(); - } - if (planNode instanceof ExchangeNode || planNode instanceof SortNode || planNode instanceof UnionNode - // this means we have filter->limit->project, need a SelectNode - || child instanceof PhysicalProject) { - // the three nodes don't support conjuncts, need create a SelectNode to filter data + // the three nodes don't support conjuncts, need create a SelectNode to filter data + if (planNode instanceof ExchangeNode || planNode instanceof SortNode || planNode instanceof UnionNode) { SelectNode selectNode = new SelectNode(context.nextPlanNodeId(), planNode); selectNode.setNereidsId(filter.getId()); addConjunctsToPlanNode(filter, selectNode, context); addPlanRoot(inputFragment, selectNode, filter); } else { if (!(filter.child(0) instanceof AbstractPhysicalJoin)) { + // already have filter on this node, we should not override it, so need a new node + if (!planNode.getConjuncts().isEmpty() + // already have project on this node, filter need execute after project, so need a new node + || CollectionUtils.isNotEmpty(planNode.getProjectList()) + // already have limit on this node, filter need execute after limit, so need a new node + || planNode.hasLimit()) { + planNode = new SelectNode(context.nextPlanNodeId(), planNode); + planNode.setNereidsId(filter.getId()); + addPlanRoot(inputFragment, planNode, filter); + } addConjunctsToPlanNode(filter, planNode, context); - updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), filter); } } + updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), filter); // in ut, filter.stats may be null if (filter.getStats() != null) { inputFragment.getPlanRoot().setCardinalityAfterFilter((long) filter.getStats().getRowCount()); @@ -1893,8 +1903,15 @@ public PlanFragment visitPhysicalProject(PhysicalProject project } PlanFragment inputFragment = project.child(0).accept(this, context); - PlanNode inputPlanNode = inputFragment.getPlanRoot(); + // this means already have project on this node, filter need execute after project, so need a new node + if (CollectionUtils.isNotEmpty(inputPlanNode.getProjectList())) { + SelectNode selectNode = new SelectNode(context.nextPlanNodeId(), inputPlanNode); + selectNode.setNereidsId(project.getId()); + addPlanRoot(inputFragment, selectNode, project); + inputPlanNode = selectNode; + } + List projectionExprs = null; List allProjectionExprs = Lists.newArrayList(); List slots = null; @@ -1931,6 +1948,11 @@ public PlanFragment visitPhysicalProject(PhysicalProject project MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink(); DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get( multiCastDataSink.getDataStreamSinks().size() - 1); + if (CollectionUtils.isNotEmpty(dataStreamSink.getProjections())) { + String errMsg = "generate invalid plan \n" + project.treeString(); + LOG.warn(errMsg); + throw new AnalysisException(errMsg); + } TupleDescriptor projectionTuple = generateTupleDesc(slots, null, context); dataStreamSink.setProjections(projectionExprs); dataStreamSink.setOutputTupleDesc(projectionTuple);