From fe8d7a96c98a4fa56e1d04cfb939f025ee3c26af Mon Sep 17 00:00:00 2001 From: wangbo <506340561@qq.com> Date: Thu, 18 Jul 2024 19:59:48 +0800 Subject: [PATCH] add min scan thread num for workload group's scan thread --- be/src/common/config.cpp | 1 + be/src/common/config.h | 1 + .../runtime/workload_group/workload_group.cpp | 5 +- be/src/util/s3_util.cpp | 2 +- be/src/vec/exec/scan/scanner_scheduler.cpp | 60 +++++++------------ be/src/vec/exec/scan/scanner_scheduler.h | 13 ++-- 6 files changed, 38 insertions(+), 44 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 3e9203987c253d..8d30977d62236a 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -250,6 +250,7 @@ DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) -> b } return true; }); +DEFINE_Int32(doris_scanner_min_thread_pool_thread_num, "8"); DEFINE_Int32(remote_split_source_batch_size, "10240"); DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1"); // number of olap scanner thread pool queue size diff --git a/be/src/common/config.h b/be/src/common/config.h index 1ce9c66939c256..0596c4a2c9dcfb 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -299,6 +299,7 @@ DECLARE_mInt64(doris_blocking_priority_queue_wait_timeout_ms); // number of scanner thread pool size for olap table // and the min thread num of remote scanner thread pool DECLARE_mInt32(doris_scanner_thread_pool_thread_num); +DECLARE_mInt32(doris_scanner_min_thread_pool_thread_num); // number of batch size to fetch the remote split source DECLARE_mInt32(remote_split_source_batch_size); // max number of remote scanner thread pool size diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index fd45093758e858..f4d1e0d4f7eb95 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -316,7 +316,7 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info( } // 11 min remote scan thread num - int min_remote_scan_thread_num = vectorized::ScannerScheduler::get_remote_scan_thread_num(); + int min_remote_scan_thread_num = config::doris_scanner_min_thread_pool_thread_num; if (tworkload_group_info.__isset.min_remote_scan_thread_num && tworkload_group_info.min_remote_scan_thread_num > 0) { min_remote_scan_thread_num = tworkload_group_info.min_remote_scan_thread_num; @@ -415,7 +415,8 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e std::unique_ptr remote_scan_scheduler = std::make_unique("RScan_" + tg_name, cg_cpu_ctl_ptr); - Status ret = remote_scan_scheduler->start(remote_max_thread_num, remote_max_thread_num, + Status ret = remote_scan_scheduler->start(remote_max_thread_num, + config::doris_scanner_min_thread_pool_thread_num, remote_scan_thread_queue_size); if (ret.ok()) { _remote_scan_task_sched = std::move(remote_scan_scheduler); diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp index d7a83fa2cff29a..ffb93c2d9d9f99 100644 --- a/be/src/util/s3_util.cpp +++ b/be/src/util/s3_util.cpp @@ -257,7 +257,7 @@ std::shared_ptr S3ClientFactory::_create_s3_client( aws_config.maxConnections = config::doris_scanner_thread_pool_thread_num; #else aws_config.maxConnections = - ExecEnv::GetInstance()->scanner_scheduler()->remote_thread_pool_max_size(); + ExecEnv::GetInstance()->scanner_scheduler()->remote_thread_pool_max_thread_num(); #endif } diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 4d07e66917dcd0..351f5d4e275074 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -80,28 +80,33 @@ void ScannerScheduler::stop() { _is_closed = true; - _local_scan_thread_pool->shutdown(); - _remote_scan_thread_pool->shutdown(); _limited_scan_thread_pool->shutdown(); - - _local_scan_thread_pool->join(); - _remote_scan_thread_pool->join(); _limited_scan_thread_pool->wait(); + _local_scan_thread_pool->stop(); + _remote_scan_thread_pool->stop(); + LOG(INFO) << "ScannerScheduler stopped"; } Status ScannerScheduler::init(ExecEnv* env) { // 1. local scan thread pool - _local_scan_thread_pool = std::make_unique( - config::doris_scanner_thread_pool_thread_num, - config::doris_scanner_thread_pool_queue_size, "local_scan"); + _local_scan_thread_pool = + std::make_unique("local_scan", nullptr); + Status ret1 = _local_scan_thread_pool->start(config::doris_scanner_thread_pool_thread_num, + config::doris_scanner_thread_pool_thread_num, + config::doris_scanner_thread_pool_queue_size); + RETURN_IF_ERROR(ret1); // 2. remote scan thread pool - _remote_thread_pool_max_size = ScannerScheduler::get_remote_scan_thread_num(); + _remote_thread_pool_max_thread_num = ScannerScheduler::get_remote_scan_thread_num(); int remote_scan_pool_queue_size = ScannerScheduler::get_remote_scan_thread_queue_size(); - _remote_scan_thread_pool = std::make_unique( - _remote_thread_pool_max_size, remote_scan_pool_queue_size, "RemoteScanThreadPool"); + _remote_scan_thread_pool = + std::make_unique("RemoteScanThreadPool", nullptr); + Status ret2 = _remote_scan_thread_pool->start(_remote_thread_pool_max_thread_num, + config::doris_scanner_min_thread_pool_thread_num, + remote_scan_pool_queue_size); + RETURN_IF_ERROR(ret2); // 3. limited scan thread pool RETURN_IF_ERROR(ThreadPoolBuilder("LimitedScanThreadPool") @@ -127,9 +132,6 @@ void ScannerScheduler::submit(std::shared_ptr ctx, return; } - // Submit scanners to thread pool - // TODO(cmy): How to handle this "nice"? - int nice = 1; if (ctx->thread_token != nullptr) { std::shared_ptr scanner_delegate = scan_task->scanner.lock(); if (scanner_delegate == nullptr) { @@ -163,27 +165,13 @@ void ScannerScheduler::submit(std::shared_ptr ctx, TabletStorageType type = scanner_delegate->_scanner->get_storage_type(); auto sumbit_task = [&]() { bool is_local = type == TabletStorageType::STORAGE_TYPE_LOCAL; - auto* scan_sched = + SimplifiedScanScheduler* scan_sched = is_local ? ctx->get_simple_scan_scheduler() : ctx->get_remote_scan_scheduler(); - auto& thread_pool = is_local ? _local_scan_thread_pool : _remote_scan_thread_pool; - if (scan_sched) { - auto work_func = [scanner_ref = scan_task, ctx]() { - auto status = [&] { - RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref)); - return Status::OK(); - }(); - - if (!status.ok()) { - scanner_ref->set_status(status); - ctx->append_block_to_queue(scanner_ref); - } - }; - SimplifiedScanTask simple_scan_task = {work_func, ctx}; - return scan_sched->submit_scan_task(simple_scan_task); + if (!scan_sched) { // query without workload group + scan_sched = + is_local ? _local_scan_thread_pool.get() : _remote_scan_thread_pool.get(); } - - PriorityThreadPool::Task task; - task.work_function = [scanner_ref = scan_task, ctx]() { + auto work_func = [scanner_ref = scan_task, ctx]() { auto status = [&] { RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref)); return Status::OK(); @@ -194,10 +182,8 @@ void ScannerScheduler::submit(std::shared_ptr ctx, ctx->append_block_to_queue(scanner_ref); } }; - task.priority = nice; - return thread_pool->offer(task) - ? Status::OK() - : Status::InternalError("Scan thread pool had shutdown"); + SimplifiedScanTask simple_scan_task = {work_func, ctx}; + return scan_sched->submit_scan_task(simple_scan_task); }; if (auto ret = sumbit_task(); !ret) { diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 238afc15bf6254..ddc61396e23f15 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -39,6 +39,7 @@ namespace doris::vectorized { class ScannerDelegate; class ScanTask; class ScannerContext; +class SimplifiedScanScheduler; // Responsible for the scheduling and execution of all Scanners of a BE node. // Execution thread pool @@ -63,7 +64,7 @@ class ScannerScheduler { std::unique_ptr new_limited_scan_pool_token(ThreadPool::ExecutionMode mode, int max_concurrency); - int remote_thread_pool_max_size() const { return _remote_thread_pool_max_size; } + int remote_thread_pool_max_thread_num() const { return _remote_thread_pool_max_thread_num; } static int get_remote_scan_thread_num(); @@ -81,14 +82,14 @@ class ScannerScheduler { // _local_scan_thread_pool is for local scan task(typically, olap scanner) // _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, etc.) // _limited_scan_thread_pool is a special pool for queries with resource limit - std::unique_ptr _local_scan_thread_pool; - std::unique_ptr _remote_scan_thread_pool; + std::unique_ptr _local_scan_thread_pool; + std::unique_ptr _remote_scan_thread_pool; std::unique_ptr _limited_scan_thread_pool; // true is the scheduler is closed. std::atomic_bool _is_closed = {false}; bool _is_init = false; - int _remote_thread_pool_max_size; + int _remote_thread_pool_max_thread_num; }; struct SimplifiedScanTask { @@ -193,6 +194,10 @@ class SimplifiedScanScheduler { } } + int get_queue_size() { return _scan_thread_pool->get_queue_size(); } + + int get_active_threads() { return _scan_thread_pool->num_active_threads(); } + std::vector thread_debug_info() { return _scan_thread_pool->debug_info(); } private: