Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cloud/src/common/bvars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

#include "common/bvars.h"

#include <bvar/multi_dimension.h>
#include <bvar/reducer.h>
#include <bvar/status.h>

#include <cstdint>
#include <stdexcept>

Expand Down Expand Up @@ -98,6 +102,15 @@ BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_partition_earlest_ts("recycle
BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_rowset_earlest_ts("recycler", "recycle_rowset_earlest_ts");
BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_tmp_rowset_earlest_ts("recycler", "recycle_tmp_rowset_earlest_ts");
BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_expired_txn_label_earlest_ts("recycler", "recycle_expired_txn_label_earlest_ts");
bvar::Status<int64_t> g_bvar_recycler_task_max_concurrency("recycler_task_max_concurrency_num",0);
bvar::Adder<int64_t> g_bvar_recycler_task_concurrency;

// recycler's mbvars
mBvarIntAdder g_bvar_recycler_instance_running("recycler_instance_running",{"instance_id"});
mBvarLongStatus g_bvar_recycler_instance_last_recycle_duration("recycler_instance_last_recycle_duration_ms",{"instance_id"});
mBvarLongStatus g_bvar_recycler_instance_next_time("recycler_instance_next_time_s",{"instance_id"});
mBvarPairStatus<int64_t> g_bvar_recycler_instance_recycle_times("recycler_instance_recycle_times",{"instance_id"});
mBvarLongStatus g_bvar_recycler_instance_recycle_last_success_times("recycler_instance_recycle_last_success_times",{"instance_id"});

// txn_kv's bvars
bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get");
Expand Down
107 changes: 107 additions & 0 deletions cloud/src/common/bvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
#include <bthread/mutex.h>
#include <bvar/bvar.h>
#include <bvar/latency_recorder.h>
#include <bvar/multi_dimension.h>
#include <bvar/reducer.h>

#include <cstdint>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <type_traits>
#include <utility>

/**
* Manage bvars that with similar names (identical prefix)
Expand Down Expand Up @@ -97,6 +100,102 @@ template <typename T>
requires std::is_integral_v<T>
using BvarStatusWithTag = BvarWithTag<bvar::Status<T>, true>;

/**
@brief: A wrapper class for multidimensional bvar metrics.
This template class provides a convenient interface for managing multidimensional
bvar metrics. It supports various bvar types including Adder, IntRecorder,
LatencyRecorder, Maxer, and Status.
@param: BvarType The type of bvar metric to use (must be one of the supported types)
@output: Based on the bvar multidimensional counter implementation,
the metrics output format would typically follow this structure:
{metric_name}{dimension1="value1",dimension2="value2",...} value
@example: Basic usage with an Adder:
// Create a 2-dimensional counter with dimensions "region" and "service"
mBvarWrapper<bvar::Adder<int>> request_counter("xxx_request_count", {"region", "service"});
// Increment the counter for specific dimension values
request_counter.put({"east", "login"}, 1);
request_counter.put({"west", "search"}, 1);
request_counter.put({"east", "login"}, 1); // Now east/login has value 2
// the output of above metrics:
xxx_request_count{region="east",service="login"} 2
xxx_request_count{region="west",service="search"} 1
@note: The dimensions provided in the constructor and the values provided to
put() and get() methods must match in count. Also, all supported bvar types
have different behaviors for how values are processed and retrieved.
*/
template <typename BvarType>
class mBvarWrapper {
public:
mBvarWrapper(const std::string& metric_name,
const std::initializer_list<std::string>& dim_names)
: counter_(metric_name, std::list<std::string>(dim_names)) {
static_assert(is_valid_bvar_type<BvarType>::value,
"BvarType must be one of the supported bvar types (Adder, IntRecorder, "
"LatencyRecorder, Maxer, Status)");
}

template <typename ValType>
void put(const std::initializer_list<std::string>& dim_values, ValType value) {
BvarType* stats = counter_.get_stats(std::list<std::string>(dim_values));
if (stats) {
if constexpr (std::is_same_v<BvarType, bvar::Status<double>> ||
std::is_same_v<BvarType, bvar::Status<long>> ||
is_pair_status<BvarType>::value) {
stats->set_value(value);
} else {
*stats << value;
}
}
}

auto get(const std::initializer_list<std::string>& dim_values) {
BvarType* stats = counter_.get_stats(std::list<std::string>(dim_values));
if (stats) {
return stats->get_value();
}
return std::declval<BvarType>(0);
}

private:
template <typename T>
struct is_valid_bvar_type : std::false_type {};
template <typename T>
struct is_pair_status : std::false_type {};
template <typename T>
struct is_valid_bvar_type<bvar::Adder<T>> : std::true_type {};
template <>
struct is_valid_bvar_type<bvar::IntRecorder> : std::true_type {};
template <typename T>
struct is_valid_bvar_type<bvar::Maxer<T>> : std::true_type {};
template <typename T>
struct is_valid_bvar_type<bvar::Status<T>> : std::true_type {};
template <typename T>
struct is_pair_status<bvar::Status<std::pair<T, T>>> : std::true_type {};
template <>
struct is_valid_bvar_type<bvar::LatencyRecorder> : std::true_type {};

bvar::MultiDimension<BvarType> counter_;
};

using mBvarIntAdder = mBvarWrapper<bvar::Adder<int>>;
using mBvarDoubleAdder = mBvarWrapper<bvar::Adder<double>>;
using mBvarIntRecorder = mBvarWrapper<bvar::IntRecorder>;
using mBvarLatencyRecorder = mBvarWrapper<bvar::LatencyRecorder>;
using mBvarIntMaxer = mBvarWrapper<bvar::Maxer<int>>;
using mBvarDoubleMaxer = mBvarWrapper<bvar::Maxer<double>>;
using mBvarLongStatus = mBvarWrapper<bvar::Status<long>>;
using mBvarDoubleStatus = mBvarWrapper<bvar::Status<double>>;

namespace std {
template <typename T1, typename T2>
inline std::ostream& operator<<(std::ostream& os, const std::pair<T1, T2>& p) {
return os << "{" << p.first << "," << p.second << "}";
}
} // namespace std

template <typename T>
using mBvarPairStatus = mBvarWrapper<bvar::Status<std::pair<T, T>>>;

// meta-service's bvars
extern BvarLatencyRecorderWithTag g_bvar_ms_begin_txn;
extern BvarLatencyRecorderWithTag g_bvar_ms_precommit_txn;
Expand Down Expand Up @@ -171,6 +270,14 @@ extern BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_rowset_earlest_ts;
extern BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_tmp_rowset_earlest_ts;
extern BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_expired_txn_label_earlest_ts;

extern bvar::Status<int64_t> g_bvar_recycler_task_max_concurrency;
extern bvar::Adder<int64_t> g_bvar_recycler_task_concurrency;
extern mBvarIntAdder g_bvar_recycler_instance_running;
extern mBvarLongStatus g_bvar_recycler_instance_last_recycle_duration;
extern mBvarLongStatus g_bvar_recycler_instance_next_time;
extern mBvarPairStatus<int64_t> g_bvar_recycler_instance_recycle_times;
extern mBvarLongStatus g_bvar_recycler_instance_recycle_last_success_times;

// txn_kv's bvars
extern bvar::LatencyRecorder g_bvar_txn_kv_get;
extern bvar::LatencyRecorder g_bvar_txn_kv_range_get;
Expand Down
2 changes: 2 additions & 0 deletions cloud/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ int main(int argc, char** argv) {
std::cout << "try to start meta_service, recycler" << std::endl;
}

google::SetCommandLineOption("bvar_max_dump_multi_dimension_metric_number", "2000");

brpc::Server server;
brpc::FLAGS_max_body_size = config::brpc_max_body_size;
brpc::FLAGS_socket_max_unwritten_bytes = config::brpc_socket_max_unwritten_bytes;
Expand Down
25 changes: 22 additions & 3 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

#include "recycler/recycler.h"

#include <brpc/builtin_service.pb.h>
#include <brpc/server.h>
#include <butil/endpoint.h>
#include <bvar/status.h>
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>

Expand All @@ -27,9 +29,11 @@
#include <cstddef>
#include <cstdint>
#include <deque>
#include <initializer_list>
#include <numeric>
#include <string>
#include <string_view>
#include <utility>

#include "common/stopwatch.h"
#include "meta-service/meta_service.h"
Expand Down Expand Up @@ -275,7 +279,12 @@ void Recycler::recycle_callback() {
if (stopped()) return;
LOG_INFO("begin to recycle instance").tag("instance_id", instance_id);
auto ctime_ms = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
g_bvar_recycler_task_concurrency << 1;
g_bvar_recycler_instance_running.put({instance_id}, 1);
g_bvar_recycler_instance_recycle_times.put({instance_id}, std::make_pair(ctime_ms, -1));
ret = instance_recycler->do_recycle();
g_bvar_recycler_task_concurrency << -1;
g_bvar_recycler_instance_running.put({instance_id}, -1);
// If instance recycler has been aborted, don't finish this job
if (!instance_recycler->stopped()) {
finish_instance_recycle_job(txn_kv_.get(), recycle_job_key, instance_id, ip_port_,
Expand All @@ -285,9 +294,18 @@ void Recycler::recycle_callback() {
std::lock_guard lock(mtx_);
recycling_instance_map_.erase(instance_id);
}
auto elpased_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count() -
ctime_ms;
auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
auto elpased_ms = now - ctime_ms;
g_bvar_recycler_instance_recycle_times.put({instance_id}, std::make_pair(ctime_ms, now));
g_bvar_recycler_instance_last_recycle_duration.put({instance_id}, elpased_ms);
g_bvar_recycler_instance_next_time.put({instance_id},
now + config::recycle_interval_seconds * 1000);
LOG(INFO) << "recycle instance done, "
<< "instance_id=" << instance_id << " ret=" << ret << " ctime_ms: " << ctime_ms
<< " now: " << now;

g_bvar_recycler_instance_recycle_last_success_times.put({instance_id}, now);

LOG_INFO("finish recycle instance")
.tag("instance_id", instance_id)
.tag("cost_ms", elpased_ms);
Expand Down Expand Up @@ -344,6 +362,7 @@ void Recycler::check_recycle_tasks() {

int Recycler::start(brpc::Server* server) {
instance_filter_.reset(config::recycle_whitelist, config::recycle_blacklist);
g_bvar_recycler_task_max_concurrency.set_value(config::recycle_concurrency);

if (config::enable_checker) {
checker_ = std::make_unique<Checker>(txn_kv_);
Expand Down
Loading