Skip to content

Commit

Permalink
fix(metrics): use macro to avoid some metrics miss flush (#8193)
Browse files Browse the repository at this point in the history
* fix not flush

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* remove use less code

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* fix warn

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

---------

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
  • Loading branch information
Little-Wallace authored Feb 27, 2023
1 parent 1537880 commit a753432
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 63 deletions.
12 changes: 7 additions & 5 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,10 @@ impl HummockVersionReader {
.in_span(Span::enter_with_local_parent("rewind"))
.await?;
local_stats.found_key = user_iter.is_valid();
local_stats.sub_iter_count = local_stats.staging_imm_iter_count
+ local_stats.staging_sst_iter_count
+ local_stats.overlapping_iter_count
+ local_stats.non_overlapping_iter_count;

Ok(HummockStorageIterator::new(
user_iter,
Expand All @@ -763,7 +767,6 @@ impl HummockVersionReader {
read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
) -> StorageResult<bool> {
let table_id = read_options.table_id;
let mut table_counts = 0;
let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
let mut stats_guard =
MayExistLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
Expand Down Expand Up @@ -815,7 +818,7 @@ impl HummockVersionReader {

// 2. order guarantee: imm -> sst
for local_sst in &uncommitted_ssts {
table_counts += 1;
stats_guard.local_stats.may_exist_check_sstable_count += 1;
if hit_sstable_bloom_filter(
self.sstable_store
.sstable(local_sst, &mut stats_guard.local_stats)
Expand All @@ -841,7 +844,7 @@ impl HummockVersionReader {
let sstable_infos =
prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
for sstable_info in sstable_infos {
table_counts += 1;
stats_guard.local_stats.may_exist_check_sstable_count += 1;
if hit_sstable_bloom_filter(
self.sstable_store
.sstable(sstable_info, &mut stats_guard.local_stats)
Expand All @@ -859,7 +862,7 @@ impl HummockVersionReader {
prune_nonoverlapping_ssts(&level.table_infos, &encoded_user_key_range);

for table_info in table_infos {
table_counts += 1;
stats_guard.local_stats.may_exist_check_sstable_count += 1;
if hit_sstable_bloom_filter(
self.sstable_store
.sstable(table_info, &mut stats_guard.local_stats)
Expand All @@ -875,7 +878,6 @@ impl HummockVersionReader {
}
}

stats_guard.local_stats.may_exist_check_sstable_count = table_counts;
Ok(false)
}
}
146 changes: 88 additions & 58 deletions src/storage/src/monitor/local_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,6 @@ use crate::monitor::CompactorMetrics;

thread_local!(static LOCAL_METRICS: RefCell<HashMap<u32,LocalStoreMetrics>> = RefCell::new(HashMap::default()));

macro_rules! inc_local_metrics {
($self:ident, $metrics: ident, $($x:ident),*) => {{
$(
$metrics.$x.inc_by($self.$x);
)*
}}
}

#[derive(Default, Debug)]
pub struct StoreLocalStatistic {
pub cache_data_block_miss: u64,
Expand Down Expand Up @@ -68,23 +60,14 @@ pub struct StoreLocalStatistic {

impl StoreLocalStatistic {
pub fn add(&mut self, other: &StoreLocalStatistic) {
self.cache_meta_block_miss += other.cache_meta_block_miss;
self.cache_meta_block_total += other.cache_meta_block_total;

self.cache_data_block_miss += other.cache_data_block_miss;
self.cache_data_block_total += other.cache_data_block_total;

self.skip_multi_version_key_count += other.skip_multi_version_key_count;
self.skip_delete_key_count += other.skip_delete_key_count;
self.processed_key_count += other.processed_key_count;
self.add_count(other);
self.add_histogram(other);
self.bloom_filter_true_negative_counts += other.bloom_filter_true_negative_counts;
self.remote_io_time.fetch_add(
other.remote_io_time.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.bloom_filter_check_counts += other.bloom_filter_check_counts;
self.total_key_count += other.total_key_count;
self.get_shared_buffer_hit_counts += other.get_shared_buffer_hit_counts;

#[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
if other.added.fetch_or(true, Ordering::Relaxed) || other.reported.load(Ordering::Relaxed) {
Expand All @@ -98,38 +81,12 @@ impl StoreLocalStatistic {
}

fn report(&self, metrics: &mut LocalStoreMetrics) {
inc_local_metrics!(
self,
metrics,
cache_data_block_total,
cache_data_block_miss,
cache_meta_block_total,
cache_meta_block_miss,
skip_multi_version_key_count,
skip_delete_key_count,
get_shared_buffer_hit_counts,
total_key_count,
processed_key_count
);
metrics.add_count(self);
metrics.add_histogram(self);
let t = self.remote_io_time.load(Ordering::Relaxed) as f64;
if t > 0.0 {
metrics.remote_io_time.observe(t / 1000.0);
}
metrics
.staging_imm_iter_count
.observe(self.staging_imm_iter_count as f64);
metrics
.staging_sst_iter_count
.observe(self.staging_sst_iter_count as f64);
metrics
.overlapping_iter_count
.observe(self.overlapping_iter_count as f64);
metrics
.non_overlapping_iter_count
.observe(self.non_overlapping_iter_count as f64);
metrics
.may_exist_check_sstable_count
.observe(self.may_exist_check_sstable_count as f64);

metrics.collect_count += 1;
if metrics.collect_count > FLUSH_LOCAL_METRICS_TIMES {
Expand Down Expand Up @@ -186,8 +143,9 @@ impl StoreLocalStatistic {
return;
}
// checks SST bloom filters
inc_local_metrics!(self, metrics, bloom_filter_true_negative_counts);

metrics
.bloom_filter_true_negative_counts
.inc_by(self.bloom_filter_true_negative_counts);
metrics.read_req_check_bloom_filter_counts.inc();

if self.bloom_filter_check_counts > self.bloom_filter_true_negative_counts {
Expand Down Expand Up @@ -262,6 +220,7 @@ struct LocalStoreMetrics {
overlapping_iter_count: LocalHistogram,
non_overlapping_iter_count: LocalHistogram,
may_exist_check_sstable_count: LocalHistogram,
sub_iter_count: LocalHistogram,
iter_filter_metrics: BloomFilterLocalMetrics,
get_filter_metrics: BloomFilterLocalMetrics,
may_exist_filter_metrics: BloomFilterLocalMetrics,
Expand Down Expand Up @@ -342,6 +301,10 @@ impl LocalStoreMetrics {
.iter_merge_sstable_counts
.with_label_values(&[table_id_label, "may-exist-check-sstable"])
.local();
let sub_iter_count = metrics
.iter_merge_sstable_counts
.with_label_values(&[table_id_label, "sub-iter"])
.local();
let get_filter_metrics = BloomFilterLocalMetrics::new(metrics, table_id_label, "get");
let iter_filter_metrics = BloomFilterLocalMetrics::new(metrics, table_id_label, "iter");
let may_exist_filter_metrics =
Expand All @@ -360,6 +323,7 @@ impl LocalStoreMetrics {
staging_imm_iter_count,
staging_sst_iter_count,
overlapping_iter_count,
sub_iter_count,
non_overlapping_iter_count,
may_exist_check_sstable_count,
get_filter_metrics,
Expand All @@ -370,21 +334,87 @@ impl LocalStoreMetrics {
}

pub fn flush(&mut self) {
self.cache_data_block_total.flush();
self.cache_data_block_miss.flush();
self.cache_meta_block_total.flush();
self.cache_meta_block_miss.flush();
self.remote_io_time.flush();
self.skip_multi_version_key_count.flush();
self.skip_delete_key_count.flush();
self.get_shared_buffer_hit_counts.flush();
self.total_key_count.flush();
self.processed_key_count.flush();
self.iter_filter_metrics.flush();
self.get_filter_metrics.flush();
self.flush_histogram();
self.flush_count();
}
}

macro_rules! add_local_metrics_histogram {
($($x:ident),*) => (
impl LocalStoreMetrics {
fn add_histogram(&self, stats: &StoreLocalStatistic) {
$(
self.$x.observe(stats.$x as f64);
)*
}

fn flush_histogram(&mut self) {
$(
self.$x.flush();
)*
}
}

impl StoreLocalStatistic {
fn add_histogram(&mut self, other: &StoreLocalStatistic) {
$(
self.$x += other.$x;
)*
}
}
)
}

add_local_metrics_histogram!(
staging_imm_iter_count,
staging_sst_iter_count,
overlapping_iter_count,
non_overlapping_iter_count,
sub_iter_count,
may_exist_check_sstable_count
);

macro_rules! add_local_metrics_count {
($($x:ident),*) => (
impl LocalStoreMetrics {
fn add_count(&self, stats: &StoreLocalStatistic) {
$(
self.$x.inc_by(stats.$x);
)*
}

fn flush_count(&mut self) {
$(
self.$x.flush();
)*
}
}

impl StoreLocalStatistic {
fn add_count(&mut self, other: &StoreLocalStatistic) {
$(
self.$x += other.$x;
)*
}
}
)
}

add_local_metrics_count!(
cache_data_block_total,
cache_data_block_miss,
cache_meta_block_total,
cache_meta_block_miss,
skip_multi_version_key_count,
skip_delete_key_count,
get_shared_buffer_hit_counts,
total_key_count,
processed_key_count
);

macro_rules! define_bloom_filter_metrics {
($($x:ident),*) => (
struct BloomFilterLocalMetrics {
Expand Down

0 comments on commit a753432

Please sign in to comment.