diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp index eb8618d4048758..e495b5ea95b956 100644 --- a/cloud/src/common/bvars.cpp +++ b/cloud/src/common/bvars.cpp @@ -107,11 +107,9 @@ BvarStatusWithTag g_bvar_recycler_recycle_tmp_rowset_earlest_ts("recycl BvarStatusWithTag g_bvar_recycler_recycle_expired_txn_label_earlest_ts("recycler", "recycle_expired_txn_label_earlest_ts"); BvarStatusWithTag g_bvar_recycler_recycle_restore_job_earlest_ts("recycler", "recycle_restore_job_earlest_ts"); bvar::Status g_bvar_recycler_task_max_concurrency("recycler_task_max_concurrency_num",0); -// current concurrency of recycle task -bvar::Adder g_bvar_recycler_instance_recycle_task_concurrency; - +// current status of recycle task (submitted, completed, error) +mBvarIntAdder g_bvar_recycler_instance_recycle_task_status("recycler_instance_recycle_task_status", { "status"}); // recycler's mbvars -bvar::Adder g_bvar_recycler_instance_running_counter("recycler_instance_running_counter"); // cost time of the last whole recycle process mBvarStatus g_bvar_recycler_instance_last_round_recycle_duration("recycler_instance_last_round_recycle_duration",{"instance_id"}); mBvarStatus g_bvar_recycler_instance_next_ts("recycler_instance_next_ts",{"instance_id"}); @@ -122,12 +120,10 @@ mBvarStatus g_bvar_recycler_instance_recycle_last_success_ts("recycler_ // recycler's mbvars // instance_id: unique identifier for the instance -// resource_type: type of resource need to be recycled (index, partition, rowset, segment, tablet, etc.) // resource_id: unique identifier for the repository -// status: status of the recycle task (normal, abnormal, etc.) -mBvarIntAdder g_bvar_recycler_vault_recycle_status("recycler_vault_recycle_status", {"instance_id", "resource_id", "status"}); +// status: status of the recycle task (submitted, completed, error) +mBvarIntAdder g_bvar_recycler_vault_recycle_task_status("recycler_vault_recycle_task_status", {"instance_id", "resource_id", "status"}); // current concurrency of vault delete task -mBvarIntAdder g_bvar_recycler_vault_recycle_task_concurrency("recycler_vault_recycle_task_concurrency", {"instance_id", "resource_type", "resource_id"}); mBvarStatus g_bvar_recycler_instance_last_round_recycled_num("recycler_instance_last_round_recycled_num", {"instance_id", "resource_type"}); mBvarStatus g_bvar_recycler_instance_last_round_to_recycle_num("recycler_instance_last_round_to_recycle_num", {"instance_id", "resource_type"}); mBvarStatus g_bvar_recycler_instance_last_round_recycled_bytes("recycler_instance_last_round_recycled_bytes", {"instance_id", "resource_type"}); diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h index 9d7dfa232f01c9..740e322b9008b9 100644 --- a/cloud/src/common/bvars.h +++ b/cloud/src/common/bvars.h @@ -267,16 +267,14 @@ extern BvarStatusWithTag g_bvar_recycler_recycle_restore_job_earlest_ts // recycler's mbvars extern bvar::Status g_bvar_recycler_task_max_concurrency; -extern bvar::Adder g_bvar_recycler_instance_recycle_task_concurrency; -extern bvar::Adder g_bvar_recycler_instance_running_counter; +extern mBvarIntAdder g_bvar_recycler_instance_recycle_task_status; extern mBvarStatus g_bvar_recycler_instance_last_round_recycle_duration; extern mBvarStatus g_bvar_recycler_instance_next_ts; extern mBvarStatus g_bvar_recycler_instance_recycle_start_ts; extern mBvarStatus g_bvar_recycler_instance_recycle_end_ts; extern mBvarStatus g_bvar_recycler_instance_recycle_last_success_ts; -extern mBvarIntAdder g_bvar_recycler_vault_recycle_status; -extern mBvarIntAdder g_bvar_recycler_vault_recycle_task_concurrency; +extern mBvarIntAdder g_bvar_recycler_vault_recycle_task_status; extern mBvarStatus g_bvar_recycler_instance_last_round_recycled_num; extern mBvarStatus g_bvar_recycler_instance_last_round_to_recycle_num; extern mBvarStatus g_bvar_recycler_instance_last_round_recycled_bytes; diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index fab7de75c2aeb0..8e19c9ccd196ad 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -69,9 +69,6 @@ namespace doris::cloud { using namespace std::chrono; -RecyclerMetricsContext tablet_metrics_context_("global_recycler", "recycle_tablet"); -RecyclerMetricsContext segment_metrics_context_("global_recycler", "recycle_segment"); - // return 0 for success get a key, 1 for key not found, negative for error [[maybe_unused]] static int txn_get(TxnKv* txn_kv, std::string_view key, std::string& val) { std::unique_ptr txn; @@ -292,21 +289,18 @@ void Recycler::recycle_callback() { if (stopped()) return; LOG_INFO("begin to recycle instance").tag("instance_id", instance_id); auto ctime_ms = duration_cast(system_clock::now().time_since_epoch()).count(); - g_bvar_recycler_instance_recycle_task_concurrency << 1; - g_bvar_recycler_instance_running_counter << 1; g_bvar_recycler_instance_recycle_start_ts.put({instance_id}, ctime_ms); - tablet_metrics_context_.reset(); - segment_metrics_context_.reset(); + g_bvar_recycler_instance_recycle_task_status.put({"submitted"}, 1); ret = instance_recycler->do_recycle(); - tablet_metrics_context_.finish_report(); - segment_metrics_context_.finish_report(); - g_bvar_recycler_instance_recycle_task_concurrency << -1; - g_bvar_recycler_instance_running_counter << -1; // If instance recycler has been aborted, don't finish this job + if (!instance_recycler->stopped()) { finish_instance_recycle_job(txn_kv_.get(), recycle_job_key, instance_id, ip_port_, ret == 0, ctime_ms); } + if (instance_recycler->stopped() || ret != 0) { + g_bvar_recycler_instance_recycle_task_status.put({"error"}, 1); + } { std::lock_guard lock(mtx_); recycling_instance_map_.erase(instance_id); @@ -317,6 +311,7 @@ void Recycler::recycle_callback() { g_bvar_recycler_instance_last_round_recycle_duration.put({instance_id}, elpased_ms); g_bvar_recycler_instance_next_ts.put({instance_id}, now + config::recycle_interval_seconds * 1000); + g_bvar_recycler_instance_recycle_task_status.put({"completed"}, 1); LOG(INFO) << "recycle instance done, " << "instance_id=" << instance_id << " ret=" << ret << " ctime_ms: " << ctime_ms << " now: " << now; @@ -673,6 +668,12 @@ auto task_wrapper(Func... funcs) -> std::function { int InstanceRecycler::do_recycle() { TEST_SYNC_POINT("InstanceRecycler.do_recycle"); + tablet_metrics_context_.reset(); + segment_metrics_context_.reset(); + DORIS_CLOUD_DEFER { + tablet_metrics_context_.finish_report(); + segment_metrics_context_.finish_report(); + }; if (instance_info_.status() == InstanceInfoPB::DELETED) { return recycle_deleted_instance(); } else if (instance_info_.status() == InstanceInfoPB::NORMAL) { @@ -1889,31 +1890,31 @@ int InstanceRecycler::delete_rowset_data( //020000000000007fd045a62bc87a6587dd7ac274aa36e5a9_0.idx std::set deleted_rowset_id; - std::for_each( - paths->begin(), paths->end(), - [&metrics_context, &rowsets, &deleted_rowset_id](const std::string& path) { - std::vector str; - butil::SplitString(path, '/', &str); - std::string rowset_id; - if (auto pos = str.back().find('_'); pos != std::string::npos) { - rowset_id = str.back().substr(0, pos); - } else { - LOG(WARNING) << "failed to parse rowset_id, path=" << path; - return; - } - auto rs_meta = rowsets.find(rowset_id); - if (rs_meta != rowsets.end() && - !deleted_rowset_id.contains(rowset_id)) { - deleted_rowset_id.emplace(rowset_id); - metrics_context.total_recycled_data_size += - rs_meta->second.total_disk_size(); - segment_metrics_context_.total_recycled_num += - rs_meta->second.num_segments(); - segment_metrics_context_.total_recycled_data_size += - rs_meta->second.total_disk_size(); - metrics_context.total_recycled_num++; - } - }); + std::for_each(paths->begin(), paths->end(), + [&metrics_context, &rowsets, &deleted_rowset_id, + this](const std::string& path) { + std::vector str; + butil::SplitString(path, '/', &str); + std::string rowset_id; + if (auto pos = str.back().find('_'); pos != std::string::npos) { + rowset_id = str.back().substr(0, pos); + } else { + LOG(WARNING) << "failed to parse rowset_id, path=" << path; + return; + } + auto rs_meta = rowsets.find(rowset_id); + if (rs_meta != rowsets.end() && + !deleted_rowset_id.contains(rowset_id)) { + deleted_rowset_id.emplace(rowset_id); + metrics_context.total_recycled_data_size += + rs_meta->second.total_disk_size(); + segment_metrics_context_.total_recycled_num += + rs_meta->second.num_segments(); + segment_metrics_context_.total_recycled_data_size += + rs_meta->second.total_disk_size(); + metrics_context.total_recycled_num++; + } + }); segment_metrics_context_.report(); metrics_context.report(); } @@ -2000,12 +2001,10 @@ int InstanceRecycler::scan_tablets_and_statistics(int64_t table_id, int64_t inde } return 0; }; - return scan_and_recycle(tablet_key_begin, tablet_key_end, std::move(scan_and_statistics), - [&metrics_context]() -> int { - metrics_context.report(); - tablet_metrics_context_.report(); - return 0; - }); + int ret = scan_and_recycle(tablet_key_begin, tablet_key_end, std::move(scan_and_statistics)); + metrics_context.report(true); + tablet_metrics_context_.report(true); + return ret; } int InstanceRecycler::scan_tablet_and_statistics(int64_t tablet_id, @@ -2268,50 +2267,46 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id, RecyclerMetricsContext& .tag("instance id", instance_id_) .tag("tablet id", tablet_id) .tag("recycle tablet resource ids are", - std::accumulate(resource_ids.begin(), resource_ids.begin(), std::string(), + std::accumulate(resource_ids.begin(), resource_ids.end(), std::string(), [](std::string rs_id, const auto& it) { return rs_id.empty() ? it : rs_id + ", " + it; })); - SyncExecutor concurrent_delete_executor( + SyncExecutor> concurrent_delete_executor( _thread_pool_group.s3_producer_pool, fmt::format("delete tablet {} s3 rowset", tablet_id), - [](const int& ret) { return ret != 0; }); + [](const std::pair& ret) { return ret.first != 0; }); // delete all rowset data in this tablet // ATTN: there may be data leak if not all accessor initilized successfully // partial data deleted if the tablet is stored cross-storage vault // vault id is not attached to TabletMeta... for (const auto& resource_id : resource_ids) { - concurrent_delete_executor.add([&, rs_id = resource_id, - accessor_ptr = accessor_map_[resource_id]]() { - std::unique_ptr> defer((int*)0x01, [&](int*) { - g_bvar_recycler_vault_recycle_task_concurrency.put( - {instance_id_, metrics_context.operation_type, rs_id}, -1); - metrics_context.report(); - }); - g_bvar_recycler_vault_recycle_task_concurrency.put( - {instance_id_, metrics_context.operation_type, rs_id}, 1); - int res = accessor_ptr->delete_directory(tablet_path_prefix(tablet_id)); - if (res != 0) { - LOG(WARNING) << "failed to delete rowset data of tablet " << tablet_id - << " path=" << accessor_ptr->uri(); - g_bvar_recycler_vault_recycle_status.put({instance_id_, rs_id, "abnormal"}, 1); - return -1; - } - g_bvar_recycler_vault_recycle_status.put({instance_id_, rs_id, "normal"}, 1); - return 0; - }); + g_bvar_recycler_vault_recycle_task_status.put({instance_id_, resource_id, "submitted"}, 1); + concurrent_delete_executor.add( + [&, rs_id = resource_id, + accessor_ptr = accessor_map_[resource_id]]() -> decltype(auto) { + std::unique_ptr> defer( + (int*)0x01, [&](int*) { metrics_context.report(); }); + int res = accessor_ptr->delete_directory(tablet_path_prefix(tablet_id)); + if (res != 0) { + LOG(WARNING) << "failed to delete rowset data of tablet " << tablet_id + << " path=" << accessor_ptr->uri(); + return std::make_pair(-1, rs_id); + } + return std::make_pair(0, rs_id); + }); } bool finished = true; - std::vector rets = concurrent_delete_executor.when_all(&finished); - for (int r : rets) { - if (r != 0) { + std::vector> rets = concurrent_delete_executor.when_all(&finished); + for (auto& r : rets) { + if (r.first != 0) { + g_bvar_recycler_vault_recycle_task_status.put({instance_id_, r.second, "error"}, 1); ret = -1; } + g_bvar_recycler_vault_recycle_task_status.put({instance_id_, r.second, "completed"}, 1); } - ret = finished ? ret : -1; if (ret != 0) { // failed recycle tablet data @@ -3985,13 +3980,11 @@ int InstanceRecycler::scan_and_statistics_indexes() { return 0; }; - return scan_and_recycle(index_key0, index_key1, std::move(handle_index_kv), - [&metrics_context]() -> int { - metrics_context.report(true); - segment_metrics_context_.report(true); - tablet_metrics_context_.report(true); - return 0; - }); + int ret = scan_and_recycle(index_key0, index_key1, std::move(handle_index_kv)); + metrics_context.report(true); + segment_metrics_context_.report(true); + tablet_metrics_context_.report(true); + return ret; } // Scan and statistics partitions that need to be recycled @@ -4053,13 +4046,12 @@ int InstanceRecycler::scan_and_statistics_partitions() { metrics_context.total_need_recycle_num++; return ret; }; - return scan_and_recycle(part_key0, part_key1, std::move(handle_partition_kv), - [&metrics_context]() -> int { - metrics_context.report(true); - segment_metrics_context_.report(true); - tablet_metrics_context_.report(true); - return 0; - }); + + int ret = scan_and_recycle(part_key0, part_key1, std::move(handle_partition_kv)); + metrics_context.report(true); + segment_metrics_context_.report(true); + tablet_metrics_context_.report(true); + return ret; } // Scan and statistics rowsets that need to be recycled @@ -4101,15 +4093,13 @@ int InstanceRecycler::scan_and_statistics_rowsets() { metrics_context.total_need_recycle_num++; metrics_context.total_need_recycle_data_size += rowset_meta->total_disk_size(); segment_metrics_context_.total_need_recycle_num += rowset_meta->num_segments(); - segment_metrics_context_.total_need_recycle_data_size += rowset_meta->total_disk_size(); + segment_metrics_context_.total_need_recycle_data_size += rowset_meta->total_disk_size(); return 0; }; - return scan_and_recycle(recyc_rs_key0, recyc_rs_key1, std::move(handle_rowset_kv), - [&metrics_context]() -> int { - metrics_context.report(true); - segment_metrics_context_.report(true); - return 0; - }); + int ret = scan_and_recycle(recyc_rs_key0, recyc_rs_key1, std::move(handle_rowset_kv)); + metrics_context.report(true); + segment_metrics_context_.report(true); + return ret; } // Scan and statistics tmp_rowsets that need to be recycled @@ -4154,12 +4144,10 @@ int InstanceRecycler::scan_and_statistics_tmp_rowsets() { segment_metrics_context_.total_need_recycle_num += rowset.num_segments(); return 0; }; - return scan_and_recycle(tmp_rs_key0, tmp_rs_key1, std::move(handle_tmp_rowsets_kv), - [&metrics_context]() -> int { - metrics_context.report(true); - segment_metrics_context_.report(true); - return 0; - }); + int ret = scan_and_recycle(tmp_rs_key0, tmp_rs_key1, std::move(handle_tmp_rowsets_kv)); + metrics_context.report(true); + segment_metrics_context_.report(true); + return ret; } // Scan and statistics abort_timeout_txn that need to be recycled @@ -4215,11 +4203,10 @@ int InstanceRecycler::scan_and_statistics_abort_timeout_txn() { } return 0; }; - return scan_and_recycle(begin_txn_running_key, end_txn_running_key, - std::move(handle_abort_timeout_txn_kv), [&metrics_context]() -> int { - metrics_context.report(true); - return 0; - }); + + int ret = scan_and_recycle(begin_txn_running_key, end_txn_running_key, std::move(handle_abort_timeout_txn_kv)); + metrics_context.report(true); + return ret; } // Scan and statistics expired_txn_label that need to be recycled @@ -4250,11 +4237,10 @@ int InstanceRecycler::scan_and_statistics_expired_txn_label() { } return 0; }; - return scan_and_recycle(begin_recycle_txn_key, end_recycle_txn_key, - std::move(handle_expired_txn_label_kv), [&metrics_context]() -> int { - metrics_context.report(true); - return 0; - }); + + int ret = scan_and_recycle(begin_recycle_txn_key, end_recycle_txn_key, std::move(handle_expired_txn_label_kv)); + metrics_context.report(true); + return ret; } // Scan and statistics copy_jobs that need to be recycled @@ -4304,11 +4290,9 @@ int InstanceRecycler::scan_and_statistics_copy_jobs() { return 0; }; - return scan_and_recycle(key0, key1, std::move(scan_and_statistics), - [&metrics_context]() -> int { - metrics_context.report(true); - return 0; - }); + int ret = scan_and_recycle(key0, key1, std::move(scan_and_statistics)); + metrics_context.report(true); + return ret; } // Scan and statistics stage that need to be recycled @@ -4364,11 +4348,9 @@ int InstanceRecycler::scan_and_statistics_stage() { return 0; }; - return scan_and_recycle(key0, key1, std::move(scan_and_statistics), - [&metrics_context]() -> int { - metrics_context.report(true); - return 0; - }); + int ret = scan_and_recycle(key0, key1, std::move(scan_and_statistics)); + metrics_context.report(true); + return ret; } // Scan and statistics expired_stage_objects that need to be recycled @@ -4456,11 +4438,9 @@ int InstanceRecycler::scan_and_statistics_versions() { return 0; }; - return scan_and_recycle(version_key_begin, version_key_end, std::move(scan_and_statistics), - [&metrics_context]() -> int { - metrics_context.report(true); - return 0; - }); + int ret = scan_and_recycle(version_key_begin, version_key_end, std::move(scan_and_statistics)); + metrics_context.report(true); + return ret; } // Scan and statistics restore jobs that need to be recycled @@ -4492,11 +4472,9 @@ int InstanceRecycler::scan_and_statistics_restore_jobs() { return 0; }; - return scan_and_recycle(restore_job_key0, restore_job_key1, std::move(scan_and_statistics), - [&metrics_context]() -> int { - metrics_context.report(true); - return 0; - }); + int ret = scan_and_recycle(restore_job_key0, restore_job_key1, std::move(scan_and_statistics)); + metrics_context.report(true); + return ret; } } // namespace doris::cloud diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h index 54c13432d3dc46..f204f016684cab 100644 --- a/cloud/src/recycler/recycler.h +++ b/cloud/src/recycler/recycler.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include @@ -46,6 +47,8 @@ class StorageVaultAccessor; class Checker; class SimpleThreadPool; class RecyclerMetricsContext; +class TabletRecyclerMetricsContext; +class SegmentRecyclerMetricsContext; struct RecyclerThreadPoolGroup { RecyclerThreadPoolGroup() = default; RecyclerThreadPoolGroup(std::shared_ptr s3_producer_pool, @@ -119,6 +122,109 @@ enum class RowsetRecyclingState { TMP_ROWSET, }; +class RecyclerMetricsContext { +public: + RecyclerMetricsContext() = default; + + RecyclerMetricsContext(std::string instance_id, std::string operation_type) + : operation_type(std::move(operation_type)), instance_id(std::move(instance_id)) { + start(); + } + + ~RecyclerMetricsContext() = default; + + std::atomic_ullong total_need_recycle_data_size = 0; + std::atomic_ullong total_need_recycle_num = 0; + + std::atomic_ullong total_recycled_data_size = 0; + std::atomic_ullong total_recycled_num = 0; + + std::string operation_type; + std::string instance_id; + + double start_time = 0; + + void start() { + start_time = duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + } + + double duration() const { + return duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count() - + start_time; + } + + void reset() { + total_need_recycle_data_size = 0; + total_need_recycle_num = 0; + total_recycled_data_size = 0; + total_recycled_num = 0; + start_time = duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + } + + void finish_report() { + if (!operation_type.empty()) { + double cost = duration(); + g_bvar_recycler_instance_last_round_recycle_elpased_ts.put( + {instance_id, operation_type}, cost); + g_bvar_recycler_instance_recycle_round.put({instance_id, operation_type}, 1); + LOG(INFO) << "recycle instance: " << instance_id + << ", operation type: " << operation_type << ", cost: " << cost + << " ms, total recycled num: " << total_recycled_num.load() + << ", total recycled data size: " << total_recycled_data_size.load() + << " bytes"; + if (cost != 0) { + if (total_recycled_num.load() != 0) { + g_bvar_recycler_instance_recycle_time_per_resource.put( + {instance_id, operation_type}, cost / total_recycled_num.load()); + } + g_bvar_recycler_instance_recycle_bytes_per_ms.put( + {instance_id, operation_type}, total_recycled_data_size.load() / cost); + } + } + } + + // `is_begin` is used to initialize total num of items need to be recycled + void report(bool is_begin = false) { + if (!operation_type.empty()) { + // is init + if (is_begin) { + auto value = total_need_recycle_num.load(); + + g_bvar_recycler_instance_last_round_to_recycle_bytes.put( + {instance_id, operation_type}, total_need_recycle_data_size.load()); + g_bvar_recycler_instance_last_round_to_recycle_num.put( + {instance_id, operation_type}, value); + } else { + g_bvar_recycler_instance_last_round_recycled_bytes.put( + {instance_id, operation_type}, total_recycled_data_size.load()); + g_bvar_recycler_instance_recycle_total_bytes_since_started.put( + {instance_id, operation_type}, total_recycled_data_size.load()); + g_bvar_recycler_instance_last_round_recycled_num.put({instance_id, operation_type}, + total_recycled_num.load()); + g_bvar_recycler_instance_recycle_total_num_since_started.put( + {instance_id, operation_type}, total_recycled_num.load()); + } + } + } +}; + +class TabletRecyclerMetricsContext : public RecyclerMetricsContext { +public: + TabletRecyclerMetricsContext() : RecyclerMetricsContext("global_recycler", "recycle_tablet") {} +}; + +class SegmentRecyclerMetricsContext : public RecyclerMetricsContext { +public: + SegmentRecyclerMetricsContext() + : RecyclerMetricsContext("global_recycler", "recycle_segment") {} +}; + class InstanceRecycler { public: explicit InstanceRecycler(std::shared_ptr txn_kv, const InstanceInfoPB& instance, @@ -304,115 +410,9 @@ class InstanceRecycler { RecyclerThreadPoolGroup _thread_pool_group; std::shared_ptr txn_lazy_committer_; -}; -class RecyclerMetricsContext { -public: - RecyclerMetricsContext() = default; - - RecyclerMetricsContext(std::string instance_id, std::string operation_type) - : operation_type(std::move(operation_type)), instance_id(std::move(instance_id)) { - start(); - } - - ~RecyclerMetricsContext() = default; - - int64_t total_need_recycle_data_size = 0; - int64_t total_need_recycle_num = 0; - - std::atomic_long total_recycled_data_size {0}; - std::atomic_long total_recycled_num {0}; - - std::string operation_type {}; - std::string instance_id {}; - - double start_time = 0; - - void start() { - start_time = duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - } - - double duration() const { - return duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count() - - start_time; - } - - void reset() { - total_need_recycle_data_size = 0; - total_need_recycle_num = 0; - total_recycled_data_size.store(0); - total_recycled_num.store(0); - start_time = duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - } - - void finish_report() { - if (!operation_type.empty()) { - double cost = duration(); - g_bvar_recycler_instance_last_round_recycle_elpased_ts.put( - {instance_id, operation_type}, cost); - g_bvar_recycler_instance_recycle_round.put({instance_id, operation_type}, 1); - LOG(INFO) << "recycle instance: " << instance_id - << ", operation type: " << operation_type << ", cost: " << cost - << " ms, total recycled num: " << total_recycled_num.load() - << ", total recycled data size: " << total_recycled_data_size.load() - << " bytes"; - if (total_recycled_num.load()) { - g_bvar_recycler_instance_recycle_time_per_resource.put( - {instance_id, operation_type}, cost / total_recycled_num.load()); - } else { - g_bvar_recycler_instance_recycle_time_per_resource.put( - {instance_id, operation_type}, -1); - } - if (total_recycled_data_size.load()) { - g_bvar_recycler_instance_recycle_bytes_per_ms.put( - {instance_id, operation_type}, total_recycled_data_size.load() / cost); - } else { - g_bvar_recycler_instance_recycle_bytes_per_ms.put({instance_id, operation_type}, - -1); - } - } - } - - // `is_begin` is used to initialize total num of items need to be recycled - void report(bool is_begin = false) { - if (!operation_type.empty()) { - // is init - if (is_begin) { - if (total_need_recycle_data_size) { - g_bvar_recycler_instance_last_round_to_recycle_bytes.put( - {instance_id, operation_type}, total_need_recycle_data_size); - } - } else { - if (total_recycled_data_size.load()) { - g_bvar_recycler_instance_last_round_recycled_bytes.put( - {instance_id, operation_type}, total_recycled_data_size.load()); - } - g_bvar_recycler_instance_recycle_total_bytes_since_started.put( - {instance_id, operation_type}, total_recycled_data_size.load()); - } - - // is init - if (is_begin) { - if (total_need_recycle_num) { - g_bvar_recycler_instance_last_round_to_recycle_num.put( - {instance_id, operation_type}, total_need_recycle_num); - } - } else { - if (total_recycled_num.load()) { - g_bvar_recycler_instance_last_round_recycled_num.put( - {instance_id, operation_type}, total_recycled_num.load()); - } - g_bvar_recycler_instance_recycle_total_num_since_started.put( - {instance_id, operation_type}, total_recycled_num.load()); - } - } - } + TabletRecyclerMetricsContext tablet_metrics_context_; + SegmentRecyclerMetricsContext segment_metrics_context_; }; } // namespace doris::cloud diff --git a/cloud/src/recycler/recycler_service.cpp b/cloud/src/recycler/recycler_service.cpp index 48b2fe370ef61e..b2cd9712a4faf0 100644 --- a/cloud/src/recycler/recycler_service.cpp +++ b/cloud/src/recycler/recycler_service.cpp @@ -107,27 +107,15 @@ void RecyclerServiceImpl::statistics_recycle(StatisticsRecycleRequest& req, Meta [](InstanceRecycler& instance_recycler) { instance_recycler.scan_and_statistics_stage(); }}, - {"recycle_expired_stage_objects", - [](InstanceRecycler& instance_recycler) { + {"recycle_expired_stage_objects", [](InstanceRecycler& instance_recycler) { instance_recycler.scan_and_statistics_expired_stage_objects(); - }}, - {"recycle_tablet", - [](InstanceRecycler& instance_recycler) { - instance_recycler.scan_and_statistics_partitions(); - instance_recycler.scan_and_statistics_indexes(); - }}, - {"recycle_segment", [](InstanceRecycler& instance_recycler) { - instance_recycler.scan_and_statistics_partitions(); - instance_recycler.scan_and_statistics_indexes(); - instance_recycler.scan_and_statistics_rowsets(); - instance_recycler.scan_and_statistics_tmp_rowsets(); }}}; std::set resource_types; for (const auto& resource_type : req.resource_type()) { if (resource_type == "*") { - std::for_each(resource_handlers.begin(), resource_handlers.end(), - [&](const auto& it) { resource_types.emplace(it.first); }); + std::ranges::for_each(resource_handlers, + [&](const auto& it) { resource_types.emplace(it.first); }); break; } else { if (!resource_handlers.contains(resource_type)) { @@ -152,15 +140,14 @@ void RecyclerServiceImpl::statistics_recycle(StatisticsRecycleRequest& req, Meta for (const auto& instance_id : req.instance_ids()) { if (instance_id == "*") { - std::for_each(instances.begin(), instances.end(), [&](const InstanceInfoPB& instance) { + std::ranges::for_each(instances, [&](const InstanceInfoPB& instance) { instance_ids.emplace(instance.instance_id()); }); break; } else { - if (std::find_if(instances.begin(), instances.end(), - [&](const InstanceInfoPB& instance) { - return instance.instance_id() == instance_id; - }) == instances.end()) { + if (std::ranges::find_if(instances, [&](const InstanceInfoPB& instance) { + return instance.instance_id() == instance_id; + }) == instances.end()) { code = MetaServiceCode::INVALID_ARGUMENT; msg = fmt::format("invalid instance id: {}", instance_id); LOG_WARNING(msg); @@ -225,24 +212,32 @@ void RecyclerServiceImpl::statistics_recycle(StatisticsRecycleRequest& req, Meta worker_pool->stop(); std::stringstream ss; - for_each(instance_ids.begin(), instance_ids.end(), [&](const std::string& id) { + std::ranges::for_each(instance_ids, [&](const std::string& id) { ss << "Instance ID: " << id << "\n"; ss << "----------------------------------------\n"; - for_each(resource_types.begin(), resource_types.end(), [&](const auto& resource_type) { - int64_t to_recycle_num = 0; - int64_t to_recycle_bytes = 0; - if (resource_type == "recycle_segment" || resource_type == "recycle_tablet") { - to_recycle_num = g_bvar_recycler_instance_last_round_to_recycle_num.get( - {"global_recycler", resource_type}); - to_recycle_bytes = g_bvar_recycler_instance_last_round_to_recycle_bytes.get( - {"global_recycler", resource_type}); - } else { - to_recycle_num = - g_bvar_recycler_instance_last_round_to_recycle_num.get({id, resource_type}); - to_recycle_bytes = g_bvar_recycler_instance_last_round_to_recycle_bytes.get( - {id, resource_type}); - } + // tablet and segment statistics + int64_t tablet_num = g_bvar_recycler_instance_last_round_to_recycle_num.get( + {"global_recycler", "recycle_tablet"}); + int64_t tablet_bytes = g_bvar_recycler_instance_last_round_to_recycle_num.get( + {"global_recycler", "recycle_tablet"}); + int64_t segment_num = g_bvar_recycler_instance_last_round_to_recycle_num.get( + {"global_recycler", "recycle_segment"}); + int64_t segment_bytes = g_bvar_recycler_instance_last_round_to_recycle_num.get( + {"global_recycler", "recycle_segment"}); + // clang-format off + ss << "Global recycler: " << "tablet and segment" << "\n"; + ss << " • Need to recycle tablet count: " << tablet_num << " items\n"; + ss << " • Need to recycle tablet size: " << tablet_bytes << " bytes\n"; + ss << " • Need to recycle segment count: " << segment_num << " items\n"; + ss << " • Need to recycle segment size: " << segment_bytes << " bytes\n"; + // clang-format on + + std::ranges::for_each(resource_types, [&](const auto& resource_type) { + int64_t to_recycle_num = + g_bvar_recycler_instance_last_round_to_recycle_num.get({id, resource_type}); + int64_t to_recycle_bytes = to_recycle_bytes = + g_bvar_recycler_instance_last_round_to_recycle_bytes.get({id, resource_type}); ss << "Task Type: " << resource_type << "\n";