1+ #pragma once
2+
3+ #include < atomic>
4+ #include < sstream>
5+ #include < thread>
6+ #include < vector>
7+ #include < ylt/metric/counter.hpp>
8+ #include < ylt/metric/histogram.hpp>
9+ #include < ylt/metric/summary.hpp>
10+ #include " utils.h"
11+
12+ namespace mooncake {
13+
14+ // latency bucket is in microsecond
15+ // Tuned for RDMA: fine-grained in <1ms, with ms-scale tail up to 1s
16+ const std::vector<double > kLatencyBucket = {
17+ // sub-ms to 1ms region
18+ 125 , 150 , 200 , 250 , 300 , 400 , 500 , 750 , 1000 ,
19+ // ms-level tail for batch/occasional spikes
20+ 1500 , 2000 , 3000 , 5000 , 7000 , 15000 , 20000 ,
21+ // safeguards for long tails
22+ 50000 , 100000 , 200000 , 500000 , 1000000 };
23+
24+ struct TransferMetric {
25+ ylt::metric::counter_t total_read_bytes{" mooncake_transfer_read_bytes" ,
26+ " Total bytes read" };
27+ ylt::metric::counter_t total_write_bytes{" mooncake_transfer_write_bytes" ,
28+ " Total bytes written" };
29+ ylt::metric::histogram_t batch_put_latency_us{
30+ " mooncake_transfer_batch_put_latency" ,
31+ " Batch Put transfer latency (us)" , kLatencyBucket };
32+ ylt::metric::histogram_t batch_get_latency_us{
33+ " mooncake_transfer_batch_get_latency" ,
34+ " Batch Get transfer latency (us)" , kLatencyBucket };
35+ ylt::metric::histogram_t get_latency_us{" mooncake_transfer_get_latency" ,
36+ " Get transfer latency (us)" ,
37+ kLatencyBucket };
38+ ylt::metric::histogram_t put_latency_us{" mooncake_transfer_put_latency" ,
39+ " Put transfer latency (us)" ,
40+ kLatencyBucket };
41+
42+ void serialize (std::string& str) {
43+ total_read_bytes.serialize (str);
44+ total_write_bytes.serialize (str);
45+ batch_put_latency_us.serialize (str);
46+ batch_get_latency_us.serialize (str);
47+ get_latency_us.serialize (str);
48+ put_latency_us.serialize (str);
49+ }
50+
51+ std::string summary_metrics () {
52+ std::stringstream ss;
53+ ss << " === Transfer Metrics Summary ===\n " ;
54+
55+ // Bytes transferred
56+ auto read_bytes = total_read_bytes.value ();
57+ auto write_bytes = total_write_bytes.value ();
58+ ss << " Total Read: " << byte_size_to_string (read_bytes) << " \n " ;
59+ ss << " Total Write: " << byte_size_to_string (write_bytes) << " \n " ;
60+
61+ // Latency summaries
62+ ss << " \n === Latency Summary (microseconds) ===\n " ;
63+ ss << " Get: " << format_latency_summary (get_latency_us) << " \n " ;
64+ ss << " Put: " << format_latency_summary (put_latency_us) << " \n " ;
65+ ss << " Batch Get: " << format_latency_summary (batch_get_latency_us)
66+ << " \n " ;
67+ ss << " Batch Put: " << format_latency_summary (batch_put_latency_us)
68+ << " \n " ;
69+
70+ return ss.str ();
71+ }
72+
73+ private:
74+ std::string format_latency_summary (ylt::metric::histogram_t & hist) {
75+ // Access the internal sum and bucket counts
76+ auto sum_ptr =
77+ const_cast <ylt::metric::histogram_t &>(hist).get_bucket_counts ();
78+ if (sum_ptr.empty ()) {
79+ return " No data" ;
80+ }
81+
82+ // Calculate total count from all buckets
83+ int64_t total_count = 0 ;
84+ for (auto & bucket : sum_ptr) {
85+ total_count += bucket->value ();
86+ }
87+
88+ if (total_count == 0 ) {
89+ return " No data" ;
90+ }
91+
92+ // Get sum from the histogram's internal sum gauge
93+ // Note: We need to access the private sum_ member, which requires
94+ // friendship or reflection For now, let's use a simpler approach
95+ // showing just count
96+ std::stringstream ss;
97+ ss << " count=" << total_count;
98+
99+ // Find P95
100+ int64_t p95_target = (total_count * 95 ) / 100 ;
101+ int64_t cumulative = 0 ;
102+ double p95_bucket = 0 ;
103+
104+ for (size_t i = 0 ; i < sum_ptr.size () && i < kLatencyBucket .size ();
105+ i++) {
106+ cumulative += sum_ptr[i]->value ();
107+ if (cumulative >= p95_target && p95_bucket == 0 ) {
108+ p95_bucket = kLatencyBucket [i];
109+ break ;
110+ }
111+ }
112+
113+ if (p95_bucket > 0 ) {
114+ ss << " , p95<" << p95_bucket << " μs" ;
115+ }
116+
117+ // Find max bucket (highest bucket with data)
118+ double max_bucket = 0 ;
119+ for (size_t i = sum_ptr.size (); i > 0 ; i--) {
120+ size_t idx = i - 1 ;
121+ if (idx < kLatencyBucket .size () && sum_ptr[idx]->value () > 0 ) {
122+ max_bucket = kLatencyBucket [idx];
123+ break ;
124+ }
125+ }
126+
127+ if (max_bucket > 0 ) {
128+ ss << " , max<" << max_bucket << " μs" ;
129+ }
130+
131+ return ss.str ();
132+ }
133+ };
134+
135+ struct MasterClientMetric {
136+ std::array<std::string, 1 > rpc_names = {" rpc_name" };
137+
138+ MasterClientMetric ()
139+ : rpc_count(" mooncake_client_rpc_count" ,
140+ " Total number of RPC calls made by the client" , rpc_names),
141+ rpc_latency (" mooncake_client_rpc_latency" ,
142+ " Latency of RPC calls made by the client (in us)" ,
143+ kLatencyBucket , rpc_names) {}
144+
145+ ylt::metric::dynamic_counter_1t rpc_count;
146+ ylt::metric::dynamic_histogram_1t rpc_latency;
147+ void serialize (std::string& str) {
148+ rpc_count.serialize (str);
149+ rpc_latency.serialize (str);
150+ }
151+
152+ std::string summary_metrics () {
153+ std::stringstream ss;
154+ ss << " === RPC Metrics Summary ===\n " ;
155+
156+ // For dynamic metrics, we need to check if there are any labels with
157+ // data
158+ if (rpc_count.label_value_count () == 0 ) {
159+ ss << " No RPC calls recorded\n " ;
160+ return ss.str ();
161+ }
162+
163+ // Get all available RPC names from the dynamic metrics
164+ // We'll iterate through all possible RPC names instead of using a fixed
165+ // list
166+ std::vector<std::string> all_rpc_names = {" GetReplicaList" ,
167+ " PutStart" ,
168+ " PutEnd" ,
169+ " PutRevoke" ,
170+ " ExistKey" ,
171+ " Remove" ,
172+ " RemoveAll" ,
173+ " MountSegment" ,
174+ " UnmountSegment" ,
175+ " GetFsdir" ,
176+ " BatchGetReplicaList" ,
177+ " BatchPutStart" ,
178+ " BatchPutEnd" ,
179+ " BatchPutRevoke" };
180+
181+ bool found_any = false ;
182+ for (const auto & rpc_name : all_rpc_names) {
183+ std::array<std::string, 1 > label_array = {rpc_name};
184+
185+ // Check if this RPC has any data by trying to access bucket counts
186+ auto bucket_counts = rpc_latency.get_bucket_counts ();
187+ int64_t total_count = 0 ;
188+ for (auto & bucket : bucket_counts) {
189+ total_count += bucket->value (label_array);
190+ }
191+
192+ // Skip RPCs with zero count
193+ if (total_count == 0 ) continue ;
194+
195+ found_any = true ;
196+ ss << rpc_name << " : count=" << total_count;
197+
198+ // Find P95
199+ int64_t p95_target = (total_count * 95 ) / 100 ;
200+ int64_t cumulative = 0 ;
201+ double p95_bucket = 0 ;
202+
203+ for (size_t i = 0 ;
204+ i < bucket_counts.size () && i < kLatencyBucket .size (); i++) {
205+ cumulative += bucket_counts[i]->value (label_array);
206+ if (cumulative >= p95_target && p95_bucket == 0 ) {
207+ p95_bucket = kLatencyBucket [i];
208+ break ;
209+ }
210+ }
211+
212+ if (p95_bucket > 0 ) {
213+ ss << " , p95<" << p95_bucket << " μs" ;
214+ }
215+
216+ // Find max bucket (highest bucket with data)
217+ double max_bucket = 0 ;
218+ for (size_t i = bucket_counts.size (); i > 0 ; i--) {
219+ size_t idx = i - 1 ;
220+ if (idx < kLatencyBucket .size () &&
221+ bucket_counts[idx]->value (label_array) > 0 ) {
222+ max_bucket = kLatencyBucket [idx];
223+ break ;
224+ }
225+ }
226+
227+ if (max_bucket > 0 ) {
228+ ss << " , max<" << max_bucket << " μs" ;
229+ }
230+
231+ ss << " \n " ;
232+ }
233+
234+ if (!found_any) {
235+ ss << " No RPC calls recorded\n " ;
236+ }
237+
238+ return ss.str ();
239+ }
240+ };
241+
242+ struct ClientMetric {
243+ TransferMetric transfer_metric;
244+ MasterClientMetric master_client_metric;
245+
246+ /* *
247+ * @brief Creates a ClientMetric instance based on environment variables
248+ * @return std::unique_ptr<ClientMetric> containing the instance if enabled,
249+ * nullptr if disabled
250+ *
251+ * Environment variables:
252+ * - MC_STORE_CLIENT_METRIC: Enable/disable metrics (enabled by default,
253+ * set to 0/false to disable)
254+ * - MC_STORE_CLIENT_METRIC_INTERVAL: Reporting interval in seconds
255+ * (default: 0, 0 = collect but don't report)
256+ */
257+ static std::unique_ptr<ClientMetric> Create ();
258+
259+ void serialize (std::string& str);
260+ std::string summary_metrics ();
261+
262+ uint64_t GetReportingInterval () const { return metrics_interval_seconds_; }
263+
264+ explicit ClientMetric (uint64_t interval_seconds = 0 );
265+ ~ClientMetric ();
266+
267+ private:
268+ // Metrics reporting thread management
269+ std::jthread metrics_reporting_thread_;
270+ std::atomic<bool > should_stop_metrics_thread_{false };
271+ uint64_t metrics_interval_seconds_{0 };
272+
273+ void StartMetricsReportingThread ();
274+ void StopMetricsReportingThread ();
275+ };
276+ }; // namespace mooncake
0 commit comments