diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 5ee52ca3b7e0a3..f86bbb02c317e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -1260,6 +1260,12 @@ public PlanFragment visitPhysicalFilter(PhysicalFilter filter, P MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink(); DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get( multiCastDataSink.getDataStreamSinks().size() - 1); + if (CollectionUtils.isNotEmpty(dataStreamSink.getConjuncts()) + || CollectionUtils.isNotEmpty(dataStreamSink.getProjections())) { + String errMsg = "generate invalid plan \n" + filter.treeString(); + LOG.warn(errMsg); + throw new AnalysisException(errMsg); + } filter.getConjuncts().stream() .map(e -> ExpressionTranslator.translate(e, context)) .forEach(dataStreamSink::addConjunct); @@ -1267,24 +1273,28 @@ public PlanFragment visitPhysicalFilter(PhysicalFilter filter, P } PlanNode planNode = inputFragment.getPlanRoot(); - Plan child = filter.child(); - while (child instanceof PhysicalLimit) { - child = ((PhysicalLimit) child).child(); - } - if (planNode instanceof ExchangeNode || planNode instanceof SortNode || planNode instanceof UnionNode - // this means we have filter->limit->project, need a SelectNode - || child instanceof PhysicalProject) { - // the three nodes don't support conjuncts, need create a SelectNode to filter data + // the three nodes don't support conjuncts, need create a SelectNode to filter data + if (planNode instanceof ExchangeNode || planNode instanceof SortNode || planNode instanceof UnionNode) { SelectNode selectNode = new SelectNode(context.nextPlanNodeId(), planNode); selectNode.setNereidsId(filter.getId()); addConjunctsToPlanNode(filter, selectNode, context); addPlanRoot(inputFragment, selectNode, filter); } else { if (!(filter.child(0) instanceof AbstractPhysicalJoin)) { + // already have filter on this node, we should not override it, so need a new node + if (!planNode.getConjuncts().isEmpty() + // already have project on this node, filter need execute after project, so need a new node + || CollectionUtils.isNotEmpty(planNode.getProjectList()) + // already have limit on this node, filter need execute after limit, so need a new node + || planNode.hasLimit()) { + planNode = new SelectNode(context.nextPlanNodeId(), planNode); + planNode.setNereidsId(filter.getId()); + addPlanRoot(inputFragment, planNode, filter); + } addConjunctsToPlanNode(filter, planNode, context); - updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), filter); } } + updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), filter); // in ut, filter.stats may be null if (filter.getStats() != null) { inputFragment.getPlanRoot().setCardinalityAfterFilter((long) filter.getStats().getRowCount()); @@ -1893,8 +1903,15 @@ public PlanFragment visitPhysicalProject(PhysicalProject project } PlanFragment inputFragment = project.child(0).accept(this, context); - PlanNode inputPlanNode = inputFragment.getPlanRoot(); + // this means already have project on this node, filter need execute after project, so need a new node + if (CollectionUtils.isNotEmpty(inputPlanNode.getProjectList())) { + SelectNode selectNode = new SelectNode(context.nextPlanNodeId(), inputPlanNode); + selectNode.setNereidsId(project.getId()); + addPlanRoot(inputFragment, selectNode, project); + inputPlanNode = selectNode; + } + List projectionExprs = null; List allProjectionExprs = Lists.newArrayList(); List slots = null; @@ -1931,6 +1948,11 @@ public PlanFragment visitPhysicalProject(PhysicalProject project MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink(); DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get( multiCastDataSink.getDataStreamSinks().size() - 1); + if (CollectionUtils.isNotEmpty(dataStreamSink.getProjections())) { + String errMsg = "generate invalid plan \n" + project.treeString(); + LOG.warn(errMsg); + throw new AnalysisException(errMsg); + } TupleDescriptor projectionTuple = generateTupleDesc(slots, null, context); dataStreamSink.setProjections(projectionExprs); dataStreamSink.setOutputTupleDesc(projectionTuple); diff --git a/regression-test/data/nereids_p0/physical_translator/test_physical_translator.out b/regression-test/data/nereids_p0/physical_translator/test_physical_translator.out new file mode 100644 index 00000000000000..d5166d53ce7031 --- /dev/null +++ b/regression-test/data/nereids_p0/physical_translator/test_physical_translator.out @@ -0,0 +1,24 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !continue_project_shape -- +PhysicalResultSink +--PhysicalProject[t.a, x AS `k1`, x AS `k2`] +----PhysicalProject[random(100) AS `x`, t.a] +------PhysicalOlapScan[tbl_test_physical_translator] + +-- !continue_project_result -- +1 0.9616644308453555 0.9616644308453555 +200 0.32358402105146866 0.32358402105146866 + +-- !continue_filter_shape -- +PhysicalResultSink +--PhysicalProject[(a + 2) AS `x`] +----filter((cast(a as BIGINT) < 9998) and (cast(a as BIGINT) > 3)) +------PhysicalLimit[GLOBAL] +--------PhysicalLimit[LOCAL] +----------PhysicalProject[tbl_test_physical_translator.a] +------------filter((tbl_test_physical_translator.b > 10)) +--------------PhysicalOlapScan[tbl_test_physical_translator] + +-- !continue_filter_result -- +202 + diff --git a/regression-test/suites/nereids_p0/physical_translator/test_physical_translator.groovy b/regression-test/suites/nereids_p0/physical_translator/test_physical_translator.groovy new file mode 100644 index 00000000000000..3e5001a5ea3fdf --- /dev/null +++ b/regression-test/suites/nereids_p0/physical_translator/test_physical_translator.groovy @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_physical_translator") { + def tbl = "tbl_test_physical_translator" + sql 'SET enable_nereids_planner=true' + sql 'SET runtime_filter_mode=OFF' + sql 'SET enable_fallback_to_original_planner=false' + sql "SET ignore_shape_nodes='PhysicalDistribute'" + sql "SET detail_shape_nodes='PhysicalProject'" + sql 'SET disable_nereids_rules=PRUNE_EMPTY_PARTITION' + sql "drop table if exists ${tbl} force" + sql "create table ${tbl} (a int, b int) properties('replication_num' = '1')" + sql "insert into ${tbl} values(1, 10), (200, 300)" + + def sql1 = """ + SELECT a, x as k1, x as k2 FROM (SELECT a, random(100) as x FROM ${tbl}) t + """ + + explainAndOrderResult "continue_project", sql1 + explain { + sql sql1 + contains "VSELECT" + } + + def sql2 = """ + select * from (select a + 2 as x from ${tbl} where b > 10 limit 100)s where x > 5 and x < 10000 + """ + + explainAndOrderResult "continue_filter", sql2 + explain { + sql sql2 + contains "VSELECT" + } + + sql "drop table if exists ${tbl} force" +}