From a0abbba7f49132eddaf97616114ccd0c795948b6 Mon Sep 17 00:00:00 2001 From: 924060929 Date: Wed, 20 Aug 2025 10:20:13 +0800 Subject: [PATCH] [opt](nereids) optimize parallel of insert command when olap table have auto partitions (#54983) optimize parallel of insert command when olap table have auto partitions --- .../plans/physical/PhysicalOlapTableSink.java | 2 +- .../apache/doris/load/loadv2/InsertTest.java | 40 +++++++++++++++++++ .../doris/nereids/util/PlanChecker.java | 16 ++++++++ 3 files changed, 57 insertions(+), 1 deletion(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java index a04912f5119879..99969cdfb373e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java @@ -198,7 +198,7 @@ isPartialUpdate, dmlCommandType, groupExpression, getLogicalProperties(), * get output physical properties */ public PhysicalProperties getRequirePhysicalProperties() { - if (targetTable.isPartitionDistributed()) { + if (targetTable.isPartitionDistributed() || targetTable.getPartitionInfo().enableAutomaticPartition()) { DistributionInfo distributionInfo = targetTable.getDefaultDistributionInfo(); if (distributionInfo instanceof HashDistributionInfo) { // Do not enable shuffle for duplicate key tables when its tablet num is less than threshold. diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertTest.java new file mode 100644 index 00000000000000..41140e58047932 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertTest.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.nereids.trees.plans.commands.insert.AbstractInsertExecutor; +import org.apache.doris.nereids.util.PlanChecker; +import org.apache.doris.utframe.TestWithFeService; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class InsertTest extends TestWithFeService { + @Test + public void testParallelOfInsertAutoPartition() throws Exception { + createDatabase("test"); + useDatabase("test"); + + createTable("create table test.tbl(id int, name varchar(255)) auto partition by list(name)()properties('replication_num'='1')"); + + AbstractInsertExecutor insertExecutor = PlanChecker.from(connectContext) + .getInsertExecutor( + "insert into test.tbl select * from (select 1, 'test' union all select 2, 'doris')a"); + Assertions.assertEquals(1, insertExecutor.getCoordinator().getFragments().size()); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java index 6962572d07a483..d56092b625e136 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java @@ -58,6 +58,8 @@ import org.apache.doris.nereids.trees.plans.GroupPlan; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel; +import org.apache.doris.nereids.trees.plans.commands.insert.AbstractInsertExecutor; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; @@ -66,6 +68,8 @@ import org.apache.doris.planner.PlanFragment; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -75,6 +79,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.UUID; import java.util.function.Consumer; import java.util.function.Supplier; @@ -116,6 +121,17 @@ public PlanChecker checkParse(String sql, Consumer consumer) { return this; } + public AbstractInsertExecutor getInsertExecutor(String sql) throws Exception { + StatementContext statementContext = MemoTestUtils.createStatementContext(connectContext, sql); + LogicalPlan parsedPlan = new NereidsParser().parseSingle(sql); + UUID uuid = UUID.randomUUID(); + connectContext.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); + InsertIntoTableCommand insertIntoTableCommand = (InsertIntoTableCommand) parsedPlan; + LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(parsedPlan, statementContext); + return insertIntoTableCommand.initPlan(connectContext, + new StmtExecutor(connectContext, logicalPlanAdapter)); + } + public PlanChecker parse(String sql) { this.cascadesContext = MemoTestUtils.createCascadesContext(connectContext, sql); this.cascadesContext.toMemo();