diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index d7af005394479e..73cd02b5a5dd18 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -45,6 +45,8 @@ namespace doris::pipeline { +const static int32_t ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT_DEFAULT = 10000; + #define RETURN_IF_PUSH_DOWN(stmt, status) \ if (pdt == PushDownType::UNACCEPTABLE) { \ status = stmt; \ @@ -1147,12 +1149,6 @@ ScanOperatorX::ScanOperatorX(ObjectPool* pool, const TPlanNode& : OperatorX(pool, tnode, operator_id, descs), _runtime_filter_descs(tnode.runtime_filters), _parallel_tasks(parallel_tasks) { - if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) { - // Which means the request could be fullfilled in a single segment iterator request. - if (tnode.limit > 0 && tnode.limit < 1024) { - _should_run_serial = true; - } - } if (tnode.__isset.push_down_count) { _push_down_count = tnode.push_down_count; } @@ -1185,6 +1181,34 @@ Status ScanOperatorX::init(const TPlanNode& tnode, RuntimeState* if (tnode.__isset.topn_filter_source_node_ids) { topn_filter_source_node_ids = tnode.topn_filter_source_node_ids; } + + // The first branch is kept for compatibility with the old version of the FE + if (!query_options.__isset.enable_adaptive_pipeline_task_serial_read_on_limit) { + if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) { + // Which means the request could be fullfilled in a single segment iterator request. + if (tnode.limit > 0 && + tnode.limit <= ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT_DEFAULT) { + _should_run_serial = true; + } + } + } else { + DCHECK(query_options.__isset.adaptive_pipeline_task_serial_read_on_limit); + // The set of enable_adaptive_pipeline_task_serial_read_on_limit + // is checked in previous branch. + if (query_options.enable_adaptive_pipeline_task_serial_read_on_limit) { + int32_t adaptive_pipeline_task_serial_read_on_limit = + ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT_DEFAULT; + if (query_options.__isset.adaptive_pipeline_task_serial_read_on_limit) { + adaptive_pipeline_task_serial_read_on_limit = + query_options.adaptive_pipeline_task_serial_read_on_limit; + } + + if (tnode.limit > 0 && tnode.limit <= adaptive_pipeline_task_serial_read_on_limit) { + _should_run_serial = true; + } + } + } + return Status::OK(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 50b0f5a026909c..a92cac7b510260 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -739,8 +739,20 @@ public int getScanRangeNum() { } public boolean shouldUseOneInstance(ConnectContext ctx) { - long limitRowsForSingleInstance = ctx == null ? 10000 : ctx.getSessionVariable().limitRowsForSingleInstance; - return hasLimit() && getLimit() < limitRowsForSingleInstance && conjuncts.isEmpty(); + int adaptivePipelineTaskSerialReadOnLimit = 10000; + + if (ctx != null) { + if (ctx.getSessionVariable().enableAdaptivePipelineTaskSerialReadOnLimit) { + adaptivePipelineTaskSerialReadOnLimit = ctx.getSessionVariable().adaptivePipelineTaskSerialReadOnLimit; + } else { + return false; + } + } else { + // No connection context, typically for broker load. + } + + // For UniqueKey table, we will use multiple instance. + return hasLimit() && getLimit() <= adaptivePipelineTaskSerialReadOnLimit && conjuncts.isEmpty(); } // In cloud mode, meta read lock is not enough to keep a snapshot of the partition versions. diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 4140835b657004..c1d3c7a3efb0ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -654,6 +654,11 @@ public class SessionVariable implements Serializable, Writable { public static final String IN_LIST_VALUE_COUNT_THRESHOLD = "in_list_value_count_threshold"; + public static final String ENABLE_ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT = + "enable_adaptive_pipeline_task_serial_read_on_limit"; + public static final String ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT = + "adaptive_pipeline_task_serial_read_on_limit"; + /** * If set false, user couldn't submit analyze SQL and FE won't allocate any related resources. */ @@ -2115,6 +2120,7 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) { }) public boolean enableFallbackOnMissingInvertedIndex = true; + @VariableMgr.VarAttr(name = IN_LIST_VALUE_COUNT_THRESHOLD, description = { "in条件value数量大于这个threshold后将不会走fast_execute", "When the number of values in the IN condition exceeds this threshold," @@ -2122,6 +2128,22 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) { }) public int inListValueCountThreshold = 10; + @VariableMgr.VarAttr(name = ENABLE_ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT, needForward = true, description = { + "开启后将会允许自动调整 pipeline task 的并发数。当 scan 节点没有过滤条件,且 limit 参数小于" + + "adaptive_pipeline_task_serial_read_on_limit 中指定的行数时,scan 的并行度将会被设置为 1", + "When enabled, the pipeline task concurrency will be adjusted automatically. When the scan node has no filter " + + "conditions and the limit parameter is less than the number of rows specified in " + + "adaptive_pipeline_task_serial_read_on_limit, the parallelism of the scan will be set to 1." + }) + public boolean enableAdaptivePipelineTaskSerialReadOnLimit = true; + + @VariableMgr.VarAttr(name = ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT, needForward = true, description = { + "当 enable_adaptive_pipeline_task_serial_read_on_limit 开启时,scan 的并行度将会被设置为 1 的行数阈值", + "When enable_adaptive_pipeline_task_serial_read_on_limit is enabled, " + + "the number of rows at which the parallelism of the scan will be set to 1." + }) + public int adaptivePipelineTaskSerialReadOnLimit = 10000; + public void setEnableEsParallelScroll(boolean enableESParallelScroll) { this.enableESParallelScroll = enableESParallelScroll; } @@ -3700,6 +3722,9 @@ public TQueryOptions toThrift() { tResult.setKeepCarriageReturn(keepCarriageReturn); tResult.setEnableSegmentCache(enableSegmentCache); + + tResult.setEnableAdaptivePipelineTaskSerialReadOnLimit(enableAdaptivePipelineTaskSerialReadOnLimit); + tResult.setAdaptivePipelineTaskSerialReadOnLimit(adaptivePipelineTaskSerialReadOnLimit); tResult.setInListValueCountThreshold(inListValueCountThreshold); return tResult; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 85e4ade4ca4adc..9da87117154dcc 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -335,6 +335,9 @@ struct TQueryOptions { 127: optional i32 in_list_value_count_threshold = 10; + 128: optional bool enable_adaptive_pipeline_task_serial_read_on_limit = true; + 129: optional i32 adaptive_pipeline_task_serial_read_on_limit = 10000; + // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query. diff --git a/regression-test/suites/query_profile/adaptive_pipeline_task_serial_read_on_limit.groovy b/regression-test/suites/query_profile/adaptive_pipeline_task_serial_read_on_limit.groovy new file mode 100644 index 00000000000000..15481fe7c9a8dc --- /dev/null +++ b/regression-test/suites/query_profile/adaptive_pipeline_task_serial_read_on_limit.groovy @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import groovy.json.JsonOutput +import groovy.json.JsonSlurper +import groovy.json.StringEscapeUtils + + +def getProfileList = { + def dst = 'http://' + context.config.feHttpAddress + def conn = new URL(dst + "/rest/v1/query_profile").openConnection() + conn.setRequestMethod("GET") + def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + + (context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8")) + conn.setRequestProperty("Authorization", "Basic ${encoding}") + return conn.getInputStream().getText() +} + + +def getProfile = { id -> + def dst = 'http://' + context.config.feHttpAddress + def conn = new URL(dst + "/api/profile/text/?query_id=$id").openConnection() + conn.setRequestMethod("GET") + def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + + (context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8")) + conn.setRequestProperty("Authorization", "Basic ${encoding}") + return conn.getInputStream().getText() +} + +suite('adaptive_pipeline_task_serial_read_on_limit') { + sql """ + DROP TABLE IF EXISTS adaptive_pipeline_task_serial_read_on_limit; + """ + sql """ + CREATE TABLE if not exists `adaptive_pipeline_task_serial_read_on_limit` ( + `id` INT, + `name` varchar(32) + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`id`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + // Insert data to table + sql """ + insert into adaptive_pipeline_task_serial_read_on_limit values + (1, "A"),(2, "B"),(3, "C"),(4, "D"),(5,"E"),(6,"F"),(7,"G"),(8,"H"),(9,"K"); + """ + sql """ + insert into adaptive_pipeline_task_serial_read_on_limit values + (10, "A"),(20, "B"),(30, "C"),(40, "D"),(50,"E"),(60,"F"),(70,"G"),(80,"H"),(90,"K"); + """ + sql """ + insert into adaptive_pipeline_task_serial_read_on_limit values + (101, "A"),(201, "B"),(301, "C"),(401, "D"),(501,"E"),(601,"F"),(701,"G"),(801,"H"),(901,"K"); + """ + sql """ + insert into adaptive_pipeline_task_serial_read_on_limit values + (1010, "A"),(2010, "B"),(3010, "C"),(4010, "D"),(5010,"E"),(6010,"F"),(7010,"G"),(8010,"H"),(9010,"K"); + """ + + def uuidString = UUID.randomUUID().toString() + sql "set enable_profile=true" + // set parallel_pipeline_task_num to 1 so that only one scan node, + // and we can check MaxScannerThreadNum in profile. + sql "set parallel_pipeline_task_num=1;" + // no limit, MaxScannerThreadNum = TabletNum + sql """ + select "no_limit_1_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit; + """ + sql "set parallel_pipeline_task_num=0;" + // With Limit, MaxScannerThreadNum = 1 + sql """ + select "with_limit_1_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit limit 10000; + """ + // With Limit, but bigger then adaptive_pipeline_task_serial_read_on_limit, MaxScannerThreadNum = TabletNum + sql """ + select "with_limit_2_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit limit 10001; + """ + sql """ + set enable_adaptive_pipeline_task_serial_read_on_limit=false; + """ + sql "set parallel_pipeline_task_num=1;" + // Forbid the strategy, with limit, MaxScannerThreadNum = TabletNum + sql """ + select "not_enable_limit_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit limit 100; + """ + + sql "set parallel_pipeline_task_num=0;" + + // Enable the strategy, with limit 20, MaxScannerThreadNum = 1 + sql """ + set enable_adaptive_pipeline_task_serial_read_on_limit=true; + """ + sql """ + set adaptive_pipeline_task_serial_read_on_limit=10; + """ + sql """ + select "modify_to_20_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit limit 15; + """ + + sql "set enable_profile=false" + + def wholeString = getProfileList() + List profileData = new JsonSlurper().parseText(wholeString).data.rows + String queryIdNoLimit1 = ""; + String queryIdWithLimit1 = ""; + String queryIdWithLimit2 = ""; + String queryIDNotEnableLimit = ""; + String queryIdModifyTo20 = ""; + + logger.info("{}", uuidString) + + for (def profileItem in profileData) { + if (profileItem["Sql Statement"].toString().contains("no_limit_1_${uuidString}")) { + queryIdNoLimit1 = profileItem["Profile ID"].toString() + logger.info("profileItem: {}", profileItem) + } + if (profileItem["Sql Statement"].toString().contains("with_limit_1_${uuidString}")) { + queryIdWithLimit1 = profileItem["Profile ID"].toString() + logger.info("profileItem: {}", profileItem) + } + if (profileItem["Sql Statement"].toString().contains("with_limit_2_${uuidString}")) { + queryIdWithLimit2 = profileItem["Profile ID"].toString() + logger.info("profileItem: {}", profileItem) + } + if (profileItem["Sql Statement"].toString().contains("not_enable_limit_${uuidString}")) { + queryIDNotEnableLimit = profileItem["Profile ID"].toString() + logger.info("profileItem: {}", profileItem) + } + if (profileItem["Sql Statement"].toString().contains("modify_to_20_${uuidString}")) { + queryIdModifyTo20 = profileItem["Profile ID"].toString() + logger.info("profileItem: {}", profileItem) + } + } + + logger.info("queryIdNoLimit1_${uuidString}: {}", queryIdNoLimit1) + logger.info("queryIdWithLimit1_${uuidString}: {}", queryIdWithLimit1) + logger.info("queryIdWithLimit2_${uuidString}: {}", queryIdWithLimit2) + logger.info("queryIDNotEnableLimit_${uuidString}: {}", queryIDNotEnableLimit) + logger.info("queryIdModifyTo20_${uuidString}: {}", queryIdModifyTo20) + + assertTrue(queryIdNoLimit1 != "") + assertTrue(queryIdWithLimit1 != "") + assertTrue(queryIdWithLimit2 != "") + assertTrue(queryIDNotEnableLimit != "") + assertTrue(queryIdModifyTo20 != "") + + def String profileNoLimit1 = getProfile(queryIdNoLimit1).toString() + def String profileWithLimit1 = getProfile(queryIdWithLimit1).toString() + def String profileWithLimit2 = getProfile(queryIdWithLimit2).toString() + def String profileNotEnableLimit = getProfile(queryIDNotEnableLimit).toString() + def String profileModifyTo20 = getProfile(queryIdModifyTo20).toString() + + assertTrue(profileNoLimit1.contains("- MaxScannerThreadNum: 10")) + assertTrue(profileWithLimit1.contains("- MaxScannerThreadNum: 1")) + assertTrue(profileWithLimit2.contains("- MaxScannerThreadNum: 10")) + assertTrue(profileNotEnableLimit.contains("- MaxScannerThreadNum: 10")) + assertTrue(profileModifyTo20.contains("- MaxScannerThreadNum: 1")) +} \ No newline at end of file