diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 6e140fff13f87a..9307b12e292551 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -1814,10 +1814,38 @@ public PlanFragment visitPhysicalNestedLoopJoin( public PlanFragment visitPhysicalLimit(PhysicalLimit physicalLimit, PlanTranslatorContext context) { PlanFragment inputFragment = physicalLimit.child(0).accept(this, context); PlanNode child = inputFragment.getPlanRoot(); - child.setLimit(MergeLimits.mergeLimit(physicalLimit.getLimit(), physicalLimit.getOffset(), child.getLimit())); - // TODO: plan node don't support limit - // child.setOffset(MergeLimits.mergeOffset(physicalLimit.getOffset(), child.getOffset())); - updateLegacyPlanIdToPhysicalPlan(child, physicalLimit); + + if (physicalLimit.getPhase().isLocal()) { + child.setLimit(MergeLimits.mergeLimit(physicalLimit.getLimit(), physicalLimit.getOffset(), + child.getLimit())); + } else if (physicalLimit.getPhase().isGlobal()) { + if (!(child instanceof ExchangeNode)) { + ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), child); + exchangeNode.setLimit(physicalLimit.getLimit()); + exchangeNode.setOffset(physicalLimit.getOffset()); + exchangeNode.setPartitionType(TPartitionType.UNPARTITIONED); + exchangeNode.setNumInstances(1); + + PlanFragment fragment = new PlanFragment(context.nextFragmentId(), exchangeNode, + DataPartition.UNPARTITIONED); + inputFragment.setDestination(exchangeNode); + inputFragment.setOutputPartition(DataPartition.UNPARTITIONED); + + DataStreamSink sink = new DataStreamSink(exchangeNode.getId()); + sink.setOutputPartition(DataPartition.UNPARTITIONED); + inputFragment.setSink(sink); + + context.addPlanFragment(fragment); + inputFragment = fragment; + } else { + ExchangeNode exchangeNode = (ExchangeNode) child; + exchangeNode.setLimit(MergeLimits.mergeLimit(physicalLimit.getLimit(), physicalLimit.getOffset(), + exchangeNode.getLimit())); + exchangeNode.setOffset(MergeLimits.mergeOffset(physicalLimit.getOffset(), exchangeNode.getOffset())); + } + } + + updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), physicalLimit); return inputFragment; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/AddOffsetIntoDistribute.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/AddOffsetIntoDistribute.java deleted file mode 100644 index dc8173212982aa..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/AddOffsetIntoDistribute.java +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.processor.post; - -import org.apache.doris.nereids.CascadesContext; -import org.apache.doris.nereids.properties.DistributionSpecGather; -import org.apache.doris.nereids.trees.plans.Plan; -import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; -import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; - -/** - * Offset just can be in exchangeNode. - * So, `offset` action is after `limit` action. - * So, `limit` should update with `offset + limit` - */ -public class AddOffsetIntoDistribute extends PlanPostProcessor { - @Override - public Plan visitPhysicalLimit(PhysicalLimit limit, CascadesContext context) { - limit = (PhysicalLimit) super.visit(limit, context); - if (limit.getPhase().isLocal() || limit.getOffset() == 0) { - return limit; - } - - return new PhysicalDistribute<>(DistributionSpecGather.INSTANCE, - limit.withLimit(limit.getLimit() + limit.getOffset())).copyStatsAndGroupIdFrom(limit); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java index 11a4b73d8a31c5..a8654e27291c06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java @@ -62,7 +62,6 @@ public List getProcessors() { builder.add(new RemoveUselessProjectPostProcessor()); builder.add(new MergeProjectPostProcessor()); builder.add(new RecomputeLogicalPropertiesProcessor()); - builder.add(new AddOffsetIntoDistribute()); if (cascadesContext.getConnectContext().getSessionVariable().enableAggregateCse) { builder.add(new ProjectAggregateExpressionsForCse()); } diff --git a/regression-test/data/nereids_syntax_p0/test_limit.out b/regression-test/data/nereids_syntax_p0/test_limit.out new file mode 100644 index 00000000000000..5ef4497f2f1f85 --- /dev/null +++ b/regression-test/data/nereids_syntax_p0/test_limit.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !limit1 -- +2 7844 TURNER SALESMAN 7698 1981-09-08 1500.0 0.0 30 + +-- !lmit2 -- +3 7934 MILLER CLERK 7782 1982-01-23 1300.0 0.0 10 + diff --git a/regression-test/suites/nereids_syntax_p0/test_limit.groovy b/regression-test/suites/nereids_syntax_p0/test_limit.groovy index 64e48195a178d9..49759f5712868c 100644 --- a/regression-test/suites/nereids_syntax_p0/test_limit.groovy +++ b/regression-test/suites/nereids_syntax_p0/test_limit.groovy @@ -36,4 +36,45 @@ suite("test_limit") { sql "select * from test1 limit 2 offset 1" result([[1]]) } + + sql """ + drop table if exists row_number_limit_tbl; + """ + sql """ + CREATE TABLE row_number_limit_tbl ( + k1 INT NULL, + k2 VARCHAR(255) NULL, + k3 VARCHAR(255) NULL, + k4 INT NULL, + k5 VARCHAR(255) NULL, + k6 FLOAT NULL, + k7 FLOAT NULL, + k8 INT NULL + ) ENGINE=OLAP + DUPLICATE KEY(k1, k2) + DISTRIBUTED BY HASH(k1) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ INSERT INTO row_number_limit_tbl VALUES (7788, 'SCOTT', 'ANALYST', 7566, '1987-04-19', 3000, 0, 20); """ + sql """ INSERT INTO row_number_limit_tbl VALUES (7844, 'TURNER', 'SALESMAN', 7698, '1981-09-08', 1500, 0, 30); """ + sleep(1000) + + qt_limit1 """ + select row_number() over(order by k6 desc) k6s, t.* from row_number_limit_tbl t limit 1 offset 1; + """ + + sql """ truncate table row_number_limit_tbl; """ + sleep(1000) + + sql """ INSERT INTO row_number_limit_tbl VALUES (7788, 'SCOTT', 'ANALYST', 7566, '1987-04-19', 3000, 0, 20); """ + sql """ INSERT INTO row_number_limit_tbl VALUES (7844, 'TURNER', 'SALESMAN', 7698, '1981-09-08', 1500, 0, 30); """ + sql """ INSERT INTO row_number_limit_tbl VALUES (7934, 'MILLER', 'CLERK', 7782, '1982-01-23', 1300, 0, 10); """ + sleep(1000) + + qt_lmit2 """ + select row_number() over(order by k6 desc) k6s, t.* from row_number_limit_tbl t limit 1 offset 2; + """ }