Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix]add min scan thread num for workload group's scan thread #38096

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) -> b
}
return true;
});
DEFINE_Int32(doris_scanner_min_thread_pool_thread_num, "8");
DEFINE_Int32(remote_split_source_batch_size, "10240");
DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
// number of olap scanner thread pool queue size
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ DECLARE_mInt64(doris_blocking_priority_queue_wait_timeout_ms);
// number of scanner thread pool size for olap table
// and the min thread num of remote scanner thread pool
DECLARE_mInt32(doris_scanner_thread_pool_thread_num);
DECLARE_mInt32(doris_scanner_min_thread_pool_thread_num);
// number of batch size to fetch the remote split source
DECLARE_mInt32(remote_split_source_batch_size);
// max number of remote scanner thread pool size
Expand Down
5 changes: 3 additions & 2 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
}

// 11 min remote scan thread num
int min_remote_scan_thread_num = vectorized::ScannerScheduler::get_remote_scan_thread_num();
int min_remote_scan_thread_num = config::doris_scanner_min_thread_pool_thread_num;
if (tworkload_group_info.__isset.min_remote_scan_thread_num &&
tworkload_group_info.min_remote_scan_thread_num > 0) {
min_remote_scan_thread_num = tworkload_group_info.min_remote_scan_thread_num;
Expand Down Expand Up @@ -415,7 +415,8 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
std::unique_ptr<vectorized::SimplifiedScanScheduler> remote_scan_scheduler =
std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" + tg_name,
cg_cpu_ctl_ptr);
Status ret = remote_scan_scheduler->start(remote_max_thread_num, remote_max_thread_num,
Status ret = remote_scan_scheduler->start(remote_max_thread_num,
config::doris_scanner_min_thread_pool_thread_num,
remote_scan_thread_queue_size);
if (ret.ok()) {
_remote_scan_task_sched = std::move(remote_scan_scheduler);
Expand Down
2 changes: 1 addition & 1 deletion be/src/util/s3_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_s3_client(
aws_config.maxConnections = config::doris_scanner_thread_pool_thread_num;
#else
aws_config.maxConnections =
ExecEnv::GetInstance()->scanner_scheduler()->remote_thread_pool_max_size();
ExecEnv::GetInstance()->scanner_scheduler()->remote_thread_pool_max_thread_num();
#endif
}

Expand Down
60 changes: 23 additions & 37 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,28 +80,33 @@ void ScannerScheduler::stop() {

_is_closed = true;

_local_scan_thread_pool->shutdown();
_remote_scan_thread_pool->shutdown();
_limited_scan_thread_pool->shutdown();

_local_scan_thread_pool->join();
_remote_scan_thread_pool->join();
_limited_scan_thread_pool->wait();

_local_scan_thread_pool->stop();
_remote_scan_thread_pool->stop();

LOG(INFO) << "ScannerScheduler stopped";
}

Status ScannerScheduler::init(ExecEnv* env) {
// 1. local scan thread pool
_local_scan_thread_pool = std::make_unique<PriorityThreadPool>(
config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_queue_size, "local_scan");
_local_scan_thread_pool =
std::make_unique<vectorized::SimplifiedScanScheduler>("local_scan", nullptr);
Status ret1 = _local_scan_thread_pool->start(config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_queue_size);
RETURN_IF_ERROR(ret1);

// 2. remote scan thread pool
_remote_thread_pool_max_size = ScannerScheduler::get_remote_scan_thread_num();
_remote_thread_pool_max_thread_num = ScannerScheduler::get_remote_scan_thread_num();
int remote_scan_pool_queue_size = ScannerScheduler::get_remote_scan_thread_queue_size();
_remote_scan_thread_pool = std::make_unique<PriorityThreadPool>(
_remote_thread_pool_max_size, remote_scan_pool_queue_size, "RemoteScanThreadPool");
_remote_scan_thread_pool =
std::make_unique<vectorized::SimplifiedScanScheduler>("RemoteScanThreadPool", nullptr);
Status ret2 = _remote_scan_thread_pool->start(_remote_thread_pool_max_thread_num,
config::doris_scanner_min_thread_pool_thread_num,
remote_scan_pool_queue_size);
RETURN_IF_ERROR(ret2);

// 3. limited scan thread pool
RETURN_IF_ERROR(ThreadPoolBuilder("LimitedScanThreadPool")
Expand All @@ -127,9 +132,6 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
return;
}

// Submit scanners to thread pool
// TODO(cmy): How to handle this "nice"?
int nice = 1;
if (ctx->thread_token != nullptr) {
std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
if (scanner_delegate == nullptr) {
Expand Down Expand Up @@ -163,27 +165,13 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
TabletStorageType type = scanner_delegate->_scanner->get_storage_type();
auto sumbit_task = [&]() {
bool is_local = type == TabletStorageType::STORAGE_TYPE_LOCAL;
auto* scan_sched =
SimplifiedScanScheduler* scan_sched =
is_local ? ctx->get_simple_scan_scheduler() : ctx->get_remote_scan_scheduler();
auto& thread_pool = is_local ? _local_scan_thread_pool : _remote_scan_thread_pool;
if (scan_sched) {
auto work_func = [scanner_ref = scan_task, ctx]() {
auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
}();

if (!status.ok()) {
scanner_ref->set_status(status);
ctx->append_block_to_queue(scanner_ref);
}
};
SimplifiedScanTask simple_scan_task = {work_func, ctx};
return scan_sched->submit_scan_task(simple_scan_task);
if (!scan_sched) { // query without workload group
scan_sched =
is_local ? _local_scan_thread_pool.get() : _remote_scan_thread_pool.get();
}

PriorityThreadPool::Task task;
task.work_function = [scanner_ref = scan_task, ctx]() {
auto work_func = [scanner_ref = scan_task, ctx]() {
auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
Expand All @@ -194,10 +182,8 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
ctx->append_block_to_queue(scanner_ref);
}
};
task.priority = nice;
return thread_pool->offer(task)
? Status::OK()
: Status::InternalError("Scan thread pool had shutdown");
SimplifiedScanTask simple_scan_task = {work_func, ctx};
return scan_sched->submit_scan_task(simple_scan_task);
};

if (auto ret = sumbit_task(); !ret) {
Expand Down
13 changes: 9 additions & 4 deletions be/src/vec/exec/scan/scanner_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ namespace doris::vectorized {
class ScannerDelegate;
class ScanTask;
class ScannerContext;
class SimplifiedScanScheduler;

// Responsible for the scheduling and execution of all Scanners of a BE node.
// Execution thread pool
Expand All @@ -63,7 +64,7 @@ class ScannerScheduler {
std::unique_ptr<ThreadPoolToken> new_limited_scan_pool_token(ThreadPool::ExecutionMode mode,
int max_concurrency);

int remote_thread_pool_max_size() const { return _remote_thread_pool_max_size; }
int remote_thread_pool_max_thread_num() const { return _remote_thread_pool_max_thread_num; }

static int get_remote_scan_thread_num();

Expand All @@ -81,14 +82,14 @@ class ScannerScheduler {
// _local_scan_thread_pool is for local scan task(typically, olap scanner)
// _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, etc.)
// _limited_scan_thread_pool is a special pool for queries with resource limit
std::unique_ptr<PriorityThreadPool> _local_scan_thread_pool;
std::unique_ptr<PriorityThreadPool> _remote_scan_thread_pool;
std::unique_ptr<vectorized::SimplifiedScanScheduler> _local_scan_thread_pool;
std::unique_ptr<vectorized::SimplifiedScanScheduler> _remote_scan_thread_pool;
std::unique_ptr<ThreadPool> _limited_scan_thread_pool;

// true is the scheduler is closed.
std::atomic_bool _is_closed = {false};
bool _is_init = false;
int _remote_thread_pool_max_size;
int _remote_thread_pool_max_thread_num;
};

struct SimplifiedScanTask {
Expand Down Expand Up @@ -193,6 +194,10 @@ class SimplifiedScanScheduler {
}
}

int get_queue_size() { return _scan_thread_pool->get_queue_size(); }

int get_active_threads() { return _scan_thread_pool->num_active_threads(); }

std::vector<int> thread_debug_info() { return _scan_thread_pool->debug_info(); }

private:
Expand Down
Loading