Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@

namespace doris::pipeline {

const static int32_t ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT_DEFAULT = 10000;

#define RETURN_IF_PUSH_DOWN(stmt, status) \
if (pdt == PushDownType::UNACCEPTABLE) { \
status = stmt; \
Expand Down Expand Up @@ -1147,12 +1149,6 @@ ScanOperatorX<LocalStateType>::ScanOperatorX(ObjectPool* pool, const TPlanNode&
: OperatorX<LocalStateType>(pool, tnode, operator_id, descs),
_runtime_filter_descs(tnode.runtime_filters),
_parallel_tasks(parallel_tasks) {
if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) {
// Which means the request could be fullfilled in a single segment iterator request.
if (tnode.limit > 0 && tnode.limit < 1024) {
_should_run_serial = true;
}
}
if (tnode.__isset.push_down_count) {
_push_down_count = tnode.push_down_count;
}
Expand Down Expand Up @@ -1185,6 +1181,34 @@ Status ScanOperatorX<LocalStateType>::init(const TPlanNode& tnode, RuntimeState*
if (tnode.__isset.topn_filter_source_node_ids) {
topn_filter_source_node_ids = tnode.topn_filter_source_node_ids;
}

// The first branch is kept for compatibility with the old version of the FE
if (!query_options.__isset.enable_adaptive_pipeline_task_serial_read_on_limit) {
if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) {
// Which means the request could be fullfilled in a single segment iterator request.
if (tnode.limit > 0 &&
tnode.limit <= ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT_DEFAULT) {
_should_run_serial = true;
}
}
} else {
DCHECK(query_options.__isset.adaptive_pipeline_task_serial_read_on_limit);
// The set of enable_adaptive_pipeline_task_serial_read_on_limit
// is checked in previous branch.
if (query_options.enable_adaptive_pipeline_task_serial_read_on_limit) {
int32_t adaptive_pipeline_task_serial_read_on_limit =
ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT_DEFAULT;
if (query_options.__isset.adaptive_pipeline_task_serial_read_on_limit) {
adaptive_pipeline_task_serial_read_on_limit =
query_options.adaptive_pipeline_task_serial_read_on_limit;
}

if (tnode.limit > 0 && tnode.limit <= adaptive_pipeline_task_serial_read_on_limit) {
_should_run_serial = true;
}
}
}

return Status::OK();
}

Expand Down
16 changes: 14 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -739,8 +739,20 @@ public int getScanRangeNum() {
}

public boolean shouldUseOneInstance(ConnectContext ctx) {
long limitRowsForSingleInstance = ctx == null ? 10000 : ctx.getSessionVariable().limitRowsForSingleInstance;
return hasLimit() && getLimit() < limitRowsForSingleInstance && conjuncts.isEmpty();
int adaptivePipelineTaskSerialReadOnLimit = 10000;

if (ctx != null) {
if (ctx.getSessionVariable().enableAdaptivePipelineTaskSerialReadOnLimit) {
adaptivePipelineTaskSerialReadOnLimit = ctx.getSessionVariable().adaptivePipelineTaskSerialReadOnLimit;
} else {
return false;
}
} else {
// No connection context, typically for broker load.
}

// For UniqueKey table, we will use multiple instance.
return hasLimit() && getLimit() <= adaptivePipelineTaskSerialReadOnLimit && conjuncts.isEmpty();
}

// In cloud mode, meta read lock is not enough to keep a snapshot of the partition versions.
Expand Down
25 changes: 25 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,11 @@ public class SessionVariable implements Serializable, Writable {

public static final String IN_LIST_VALUE_COUNT_THRESHOLD = "in_list_value_count_threshold";

public static final String ENABLE_ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT =
"enable_adaptive_pipeline_task_serial_read_on_limit";
public static final String ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT =
"adaptive_pipeline_task_serial_read_on_limit";

/**
* If set false, user couldn't submit analyze SQL and FE won't allocate any related resources.
*/
Expand Down Expand Up @@ -2115,13 +2120,30 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) {
})
public boolean enableFallbackOnMissingInvertedIndex = true;


@VariableMgr.VarAttr(name = IN_LIST_VALUE_COUNT_THRESHOLD, description = {
"in条件value数量大于这个threshold后将不会走fast_execute",
"When the number of values in the IN condition exceeds this threshold,"
+ " fast_execute will not be used."
})
public int inListValueCountThreshold = 10;

@VariableMgr.VarAttr(name = ENABLE_ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT, needForward = true, description = {
"开启后将会允许自动调整 pipeline task 的并发数。当 scan 节点没有过滤条件,且 limit 参数小于"
+ "adaptive_pipeline_task_serial_read_on_limit 中指定的行数时,scan 的并行度将会被设置为 1",
"When enabled, the pipeline task concurrency will be adjusted automatically. When the scan node has no filter "
+ "conditions and the limit parameter is less than the number of rows specified in "
+ "adaptive_pipeline_task_serial_read_on_limit, the parallelism of the scan will be set to 1."
})
public boolean enableAdaptivePipelineTaskSerialReadOnLimit = true;

@VariableMgr.VarAttr(name = ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT, needForward = true, description = {
"当 enable_adaptive_pipeline_task_serial_read_on_limit 开启时,scan 的并行度将会被设置为 1 的行数阈值",
"When enable_adaptive_pipeline_task_serial_read_on_limit is enabled, "
+ "the number of rows at which the parallelism of the scan will be set to 1."
})
public int adaptivePipelineTaskSerialReadOnLimit = 10000;

public void setEnableEsParallelScroll(boolean enableESParallelScroll) {
this.enableESParallelScroll = enableESParallelScroll;
}
Expand Down Expand Up @@ -3700,6 +3722,9 @@ public TQueryOptions toThrift() {
tResult.setKeepCarriageReturn(keepCarriageReturn);

tResult.setEnableSegmentCache(enableSegmentCache);

tResult.setEnableAdaptivePipelineTaskSerialReadOnLimit(enableAdaptivePipelineTaskSerialReadOnLimit);
tResult.setAdaptivePipelineTaskSerialReadOnLimit(adaptivePipelineTaskSerialReadOnLimit);
tResult.setInListValueCountThreshold(inListValueCountThreshold);
return tResult;
}
Expand Down
3 changes: 3 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ struct TQueryOptions {

127: optional i32 in_list_value_count_threshold = 10;

128: optional bool enable_adaptive_pipeline_task_serial_read_on_limit = true;
129: optional i32 adaptive_pipeline_task_serial_read_on_limit = 10000;

// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import groovy.json.StringEscapeUtils


def getProfileList = {
def dst = 'http://' + context.config.feHttpAddress
def conn = new URL(dst + "/rest/v1/query_profile").openConnection()
conn.setRequestMethod("GET")
def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
(context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8"))
conn.setRequestProperty("Authorization", "Basic ${encoding}")
return conn.getInputStream().getText()
}


def getProfile = { id ->
def dst = 'http://' + context.config.feHttpAddress
def conn = new URL(dst + "/api/profile/text/?query_id=$id").openConnection()
conn.setRequestMethod("GET")
def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
(context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8"))
conn.setRequestProperty("Authorization", "Basic ${encoding}")
return conn.getInputStream().getText()
}

suite('adaptive_pipeline_task_serial_read_on_limit') {
sql """
DROP TABLE IF EXISTS adaptive_pipeline_task_serial_read_on_limit;
"""
sql """
CREATE TABLE if not exists `adaptive_pipeline_task_serial_read_on_limit` (
`id` INT,
`name` varchar(32)
) ENGINE=OLAP
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""

// Insert data to table
sql """
insert into adaptive_pipeline_task_serial_read_on_limit values
(1, "A"),(2, "B"),(3, "C"),(4, "D"),(5,"E"),(6,"F"),(7,"G"),(8,"H"),(9,"K");
"""
sql """
insert into adaptive_pipeline_task_serial_read_on_limit values
(10, "A"),(20, "B"),(30, "C"),(40, "D"),(50,"E"),(60,"F"),(70,"G"),(80,"H"),(90,"K");
"""
sql """
insert into adaptive_pipeline_task_serial_read_on_limit values
(101, "A"),(201, "B"),(301, "C"),(401, "D"),(501,"E"),(601,"F"),(701,"G"),(801,"H"),(901,"K");
"""
sql """
insert into adaptive_pipeline_task_serial_read_on_limit values
(1010, "A"),(2010, "B"),(3010, "C"),(4010, "D"),(5010,"E"),(6010,"F"),(7010,"G"),(8010,"H"),(9010,"K");
"""

def uuidString = UUID.randomUUID().toString()
sql "set enable_profile=true"
// set parallel_pipeline_task_num to 1 so that only one scan node,
// and we can check MaxScannerThreadNum in profile.
sql "set parallel_pipeline_task_num=1;"
// no limit, MaxScannerThreadNum = TabletNum
sql """
select "no_limit_1_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit;
"""
sql "set parallel_pipeline_task_num=0;"
// With Limit, MaxScannerThreadNum = 1
sql """
select "with_limit_1_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit limit 10000;
"""
// With Limit, but bigger then adaptive_pipeline_task_serial_read_on_limit, MaxScannerThreadNum = TabletNum
sql """
select "with_limit_2_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit limit 10001;
"""
sql """
set enable_adaptive_pipeline_task_serial_read_on_limit=false;
"""
sql "set parallel_pipeline_task_num=1;"
// Forbid the strategy, with limit, MaxScannerThreadNum = TabletNum
sql """
select "not_enable_limit_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit limit 100;
"""

sql "set parallel_pipeline_task_num=0;"

// Enable the strategy, with limit 20, MaxScannerThreadNum = 1
sql """
set enable_adaptive_pipeline_task_serial_read_on_limit=true;
"""
sql """
set adaptive_pipeline_task_serial_read_on_limit=10;
"""
sql """
select "modify_to_20_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit limit 15;
"""

sql "set enable_profile=false"

def wholeString = getProfileList()
List profileData = new JsonSlurper().parseText(wholeString).data.rows
String queryIdNoLimit1 = "";
String queryIdWithLimit1 = "";
String queryIdWithLimit2 = "";
String queryIDNotEnableLimit = "";
String queryIdModifyTo20 = "";

logger.info("{}", uuidString)

for (def profileItem in profileData) {
if (profileItem["Sql Statement"].toString().contains("no_limit_1_${uuidString}")) {
queryIdNoLimit1 = profileItem["Profile ID"].toString()
logger.info("profileItem: {}", profileItem)
}
if (profileItem["Sql Statement"].toString().contains("with_limit_1_${uuidString}")) {
queryIdWithLimit1 = profileItem["Profile ID"].toString()
logger.info("profileItem: {}", profileItem)
}
if (profileItem["Sql Statement"].toString().contains("with_limit_2_${uuidString}")) {
queryIdWithLimit2 = profileItem["Profile ID"].toString()
logger.info("profileItem: {}", profileItem)
}
if (profileItem["Sql Statement"].toString().contains("not_enable_limit_${uuidString}")) {
queryIDNotEnableLimit = profileItem["Profile ID"].toString()
logger.info("profileItem: {}", profileItem)
}
if (profileItem["Sql Statement"].toString().contains("modify_to_20_${uuidString}")) {
queryIdModifyTo20 = profileItem["Profile ID"].toString()
logger.info("profileItem: {}", profileItem)
}
}

logger.info("queryIdNoLimit1_${uuidString}: {}", queryIdNoLimit1)
logger.info("queryIdWithLimit1_${uuidString}: {}", queryIdWithLimit1)
logger.info("queryIdWithLimit2_${uuidString}: {}", queryIdWithLimit2)
logger.info("queryIDNotEnableLimit_${uuidString}: {}", queryIDNotEnableLimit)
logger.info("queryIdModifyTo20_${uuidString}: {}", queryIdModifyTo20)

assertTrue(queryIdNoLimit1 != "")
assertTrue(queryIdWithLimit1 != "")
assertTrue(queryIdWithLimit2 != "")
assertTrue(queryIDNotEnableLimit != "")
assertTrue(queryIdModifyTo20 != "")

def String profileNoLimit1 = getProfile(queryIdNoLimit1).toString()
def String profileWithLimit1 = getProfile(queryIdWithLimit1).toString()
def String profileWithLimit2 = getProfile(queryIdWithLimit2).toString()
def String profileNotEnableLimit = getProfile(queryIDNotEnableLimit).toString()
def String profileModifyTo20 = getProfile(queryIdModifyTo20).toString()

assertTrue(profileNoLimit1.contains("- MaxScannerThreadNum: 10"))
assertTrue(profileWithLimit1.contains("- MaxScannerThreadNum: 1"))
assertTrue(profileWithLimit2.contains("- MaxScannerThreadNum: 10"))
assertTrue(profileNotEnableLimit.contains("- MaxScannerThreadNum: 10"))
assertTrue(profileModifyTo20.contains("- MaxScannerThreadNum: 1"))
}