From e5bf103ce30d94871fa6426050e89d4f0d959698 Mon Sep 17 00:00:00 2001 From: Siyang Tang <82279870+TangSiyang2001@users.noreply.github.com> Date: Sat, 11 Nov 2023 19:53:24 +0800 Subject: [PATCH] [enhancement](metrics) enhance visibility of flush thread pool (#26544) --- be/src/olap/memtable_flush_executor.cpp | 39 ++++++++++++++---- be/src/olap/memtable_flush_executor.h | 9 ++-- be/src/util/doris_metrics.h | 10 +++++ be/src/vec/exec/scan/scanner_scheduler.cpp | 48 +++++++++++++++++++--- be/src/vec/exec/scan/scanner_scheduler.h | 5 ++- 5 files changed, 92 insertions(+), 19 deletions(-) diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index 57c2efb5294234..13952697a16a6d 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -18,29 +18,37 @@ #include "olap/memtable_flush_executor.h" #include -#include #include +#include #include #include "common/config.h" #include "common/logging.h" #include "olap/memtable.h" -#include "util/stopwatch.hpp" -#include "util/time.h" +#include "olap/rowset/rowset_writer.h" +#include "util/doris_metrics.h" +#include "util/metrics.h" namespace doris { using namespace ErrorCode; +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_thread_num, MetricUnit::NOUNIT); + +bvar::Adder g_flush_task_num("memtable_flush_task_num"); + class MemtableFlushTask final : public Runnable { public: MemtableFlushTask(FlushToken* flush_token, std::unique_ptr memtable, int64_t submit_task_time) : _flush_token(flush_token), _memtable(std::move(memtable)), - _submit_task_time(submit_task_time) {} + _submit_task_time(submit_task_time) { + g_flush_task_num << 1; + } - ~MemtableFlushTask() override = default; + ~MemtableFlushTask() override { g_flush_task_num << -1; } void run() override { _flush_token->_flush_memtable(_memtable.get(), _submit_task_time); @@ -144,10 +152,11 @@ void MemTableFlushExecutor::init(const std::vector& data_dirs) { min_threads = std::max(1, config::high_priority_flush_thread_num_per_store); max_threads = data_dir_num * min_threads; - ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool") - .set_min_threads(min_threads) - .set_max_threads(max_threads) - .build(&_high_prio_flush_pool); + static_cast(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool") + .set_min_threads(min_threads) + .set_max_threads(max_threads) + .build(&_high_prio_flush_pool)); + _register_metrics(); } // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order. @@ -178,4 +187,16 @@ Status MemTableFlushExecutor::create_flush_token(std::unique_ptr* fl return Status::OK(); } +void MemTableFlushExecutor::_register_metrics() { + REGISTER_HOOK_METRIC(flush_thread_pool_queue_size, + [this]() { return _flush_pool->get_queue_size(); }); + REGISTER_HOOK_METRIC(flush_thread_pool_thread_num, + [this]() { return _flush_pool->num_threads(); }) +} + +void MemTableFlushExecutor::_deregister_metrics() { + DEREGISTER_HOOK_METRIC(flush_thread_pool_queue_size); + DEREGISTER_HOOK_METRIC(flush_thread_pool_thread_num); +} + } // namespace doris diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index 181b63de72921e..4c8a654c08cd21 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -17,9 +17,8 @@ #pragma once -#include - #include +#include #include #include #include @@ -97,8 +96,9 @@ class FlushToken { // ... class MemTableFlushExecutor { public: - MemTableFlushExecutor() {} + MemTableFlushExecutor() = default; ~MemTableFlushExecutor() { + _deregister_metrics(); _flush_pool->shutdown(); _high_prio_flush_pool->shutdown(); } @@ -111,6 +111,9 @@ class MemTableFlushExecutor { bool should_serial, bool is_high_priority); private: + void _register_metrics(); + static void _deregister_metrics(); + std::unique_ptr _flush_pool; std::unique_ptr _high_prio_flush_pool; }; diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 646a4449c0a47f..023cf75f5b4cb6 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -227,6 +227,16 @@ class DorisMetrics { UIntGauge* heavy_work_max_threads; UIntGauge* light_work_max_threads; + UIntGauge* flush_thread_pool_queue_size; + UIntGauge* flush_thread_pool_thread_num; + + UIntGauge* local_scan_thread_pool_queue_size; + UIntGauge* local_scan_thread_pool_thread_num; + UIntGauge* remote_scan_thread_pool_queue_size; + UIntGauge* remote_scan_thread_pool_thread_num; + UIntGauge* limited_scan_thread_pool_queue_size; + UIntGauge* limited_scan_thread_pool_thread_num; + static DorisMetrics* instance() { static DorisMetrics instance; return &instance; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 3481128a1d2de4..ef97c43cd54c4f 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -17,11 +17,11 @@ #include "scanner_scheduler.h" -#include - #include +#include #include #include +#include #include #include #include @@ -40,6 +40,7 @@ #include "util/blocking_queue.hpp" #include "util/cpu_info.h" #include "util/defer_op.h" +#include "util/doris_metrics.h" #include "util/runtime_profile.h" #include "util/thread.h" #include "util/threadpool.h" @@ -53,6 +54,13 @@ namespace doris::vectorized { +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_thread_num, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_thread_num, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_thread_num, MetricUnit::NOUNIT); + ScannerScheduler::ScannerScheduler() = default; ScannerScheduler::~ScannerScheduler() { @@ -66,6 +74,8 @@ ScannerScheduler::~ScannerScheduler() { _is_closed = true; + _deregister_metrics(); + _scheduler_pool->shutdown(); _local_scan_thread_pool->shutdown(); _remote_scan_thread_pool->shutdown(); @@ -94,9 +104,9 @@ Status ScannerScheduler::init(ExecEnv* env) { } // 2. local scan thread pool - _local_scan_thread_pool.reset( - new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num, - config::doris_scanner_thread_pool_queue_size, "local_scan")); + _local_scan_thread_pool = std::make_unique( + config::doris_scanner_thread_pool_thread_num, + config::doris_scanner_thread_pool_queue_size, "local_scan"); // 3. remote scan thread pool ThreadPoolBuilder("RemoteScanThreadPool") @@ -114,6 +124,8 @@ Status ScannerScheduler::init(ExecEnv* env) { .set_max_queue_size(config::doris_scanner_thread_pool_queue_size) .build(&_limited_scan_thread_pool); + _register_metrics(); + _is_init = true; return Status::OK(); } @@ -151,7 +163,7 @@ void ScannerScheduler::_schedule_thread(int queue_id) { } [[maybe_unused]] static void* run_scanner_bthread(void* arg) { - auto f = reinterpret_cast*>(arg); + auto* f = reinterpret_cast*>(arg); (*f)(); delete f; return nullptr; @@ -401,4 +413,28 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext ctx->push_back_scanner_and_reschedule(scanner); } +void ScannerScheduler::_register_metrics() { + REGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size, + [this]() { return _local_scan_thread_pool->get_queue_size(); }); + REGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num, + [this]() { return _local_scan_thread_pool->get_active_threads(); }); + REGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size, + [this]() { return _remote_scan_thread_pool->get_queue_size(); }); + REGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num, + [this]() { return _remote_scan_thread_pool->num_threads(); }); + REGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size, + [this]() { return _limited_scan_thread_pool->get_queue_size(); }); + REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num, + [this]() { return _limited_scan_thread_pool->num_threads(); }); +} + +void ScannerScheduler::_deregister_metrics() { + DEREGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size); + DEREGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num); + DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size); + DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num); + DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size); + DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index a9792d9df94a50..fe2841ab2db56d 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -75,7 +75,10 @@ class ScannerScheduler { // execution thread function void _scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, VScannerSPtr scanner); -private: + void _register_metrics(); + + static void _deregister_metrics(); + // Scheduling queue number. // TODO: make it configurable. static const int QUEUE_NUM = 4;