From 797a633f654132000cc21fa75d6b2a426223cc13 Mon Sep 17 00:00:00 2001 From: kakachen Date: Tue, 3 Jun 2025 11:09:18 +0800 Subject: [PATCH] [opt](multi-catalog) Optimize remote scan concurrency. --- be/src/pipeline/exec/file_scan_operator.cpp | 11 +++++------ be/src/vec/exec/scan/scanner_scheduler.cpp | 11 ++++++----- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index ef94e3f1c807a5..b2d51515e73a90 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -39,9 +39,9 @@ Status FileScanLocalState::_init_scanners(std::list* sc auto& p = _parent->cast(); // There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance. - uint32_t shard_num = - std::min(config::doris_scanner_thread_pool_thread_num / p.query_parallel_instance_num(), - _max_scanners); + uint32_t shard_num = std::min(vectorized::ScannerScheduler::get_remote_scan_thread_num() / + p.query_parallel_instance_num(), + _max_scanners); shard_num = std::max(shard_num, 1U); _kv_cache.reset(new vectorized::ShardedKVCache(shard_num)); for (int i = 0; i < _max_scanners; ++i) { @@ -65,9 +65,8 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state, auto& p = _parent->cast(); auto calc_max_scanners = [&](int parallel_instance_num) -> int { - int max_scanners = config::doris_scanner_thread_pool_thread_num / parallel_instance_num; - max_scanners = - std::max(std::max(max_scanners, state->parallel_scan_max_scanners_count()), 1); + int max_scanners = + vectorized::ScannerScheduler::get_remote_scan_thread_num() / parallel_instance_num; if (should_run_serial()) { max_scanners = 1; } diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index c0dd6ecaa8e463..c2e8400954206b 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -402,11 +402,12 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, } int ScannerScheduler::get_remote_scan_thread_num() { - int remote_max_thread_num = config::doris_max_remote_scanner_thread_pool_thread_num != -1 - ? config::doris_max_remote_scanner_thread_pool_thread_num - : std::max(512, CpuInfo::num_cores() * 10); - remote_max_thread_num = - std::max(remote_max_thread_num, config::doris_scanner_thread_pool_thread_num); + static int remote_max_thread_num = []() { + int num = config::doris_max_remote_scanner_thread_pool_thread_num != -1 + ? config::doris_max_remote_scanner_thread_pool_thread_num + : std::max(512, CpuInfo::num_cores() * 10); + return std::max(num, config::doris_scanner_thread_pool_thread_num); + }(); return remote_max_thread_num; }