Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(metrics): use macro to avoid some metrics miss flush #8193

Merged
merged 4 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,10 @@ impl HummockVersionReader {
.in_span(Span::enter_with_local_parent("rewind"))
.await?;
local_stats.found_key = user_iter.is_valid();
local_stats.sub_iter_count = local_stats.staging_imm_iter_count
+ local_stats.staging_sst_iter_count
+ local_stats.overlapping_iter_count
+ local_stats.non_overlapping_iter_count;

Ok(HummockStorageIterator::new(
user_iter,
Expand All @@ -759,7 +763,6 @@ impl HummockVersionReader {
read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
) -> StorageResult<bool> {
let table_id = read_options.table_id;
let mut table_counts = 0;
let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
let mut stats_guard =
MayExistLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
Expand Down Expand Up @@ -811,7 +814,7 @@ impl HummockVersionReader {

// 2. order guarantee: imm -> sst
for local_sst in &uncommitted_ssts {
table_counts += 1;
stats_guard.local_stats.may_exist_check_sstable_count += 1;
if hit_sstable_bloom_filter(
self.sstable_store
.sstable(local_sst, &mut stats_guard.local_stats)
Expand All @@ -837,7 +840,7 @@ impl HummockVersionReader {
let sstable_infos =
prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
for sstable_info in sstable_infos {
table_counts += 1;
stats_guard.local_stats.may_exist_check_sstable_count += 1;
if hit_sstable_bloom_filter(
self.sstable_store
.sstable(sstable_info, &mut stats_guard.local_stats)
Expand All @@ -855,7 +858,7 @@ impl HummockVersionReader {
prune_nonoverlapping_ssts(&level.table_infos, &encoded_user_key_range);

for table_info in table_infos {
table_counts += 1;
stats_guard.local_stats.may_exist_check_sstable_count += 1;
if hit_sstable_bloom_filter(
self.sstable_store
.sstable(table_info, &mut stats_guard.local_stats)
Expand All @@ -871,7 +874,6 @@ impl HummockVersionReader {
}
}

stats_guard.local_stats.may_exist_check_sstable_count = table_counts;
Ok(false)
}
}
146 changes: 88 additions & 58 deletions src/storage/src/monitor/local_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,6 @@ use crate::monitor::CompactorMetrics;

thread_local!(static LOCAL_METRICS: RefCell<HashMap<u32,LocalStoreMetrics>> = RefCell::new(HashMap::default()));

macro_rules! inc_local_metrics {
($self:ident, $metrics: ident, $($x:ident),*) => {{
$(
$metrics.$x.inc_by($self.$x);
)*
}}
}

#[derive(Default, Debug)]
pub struct StoreLocalStatistic {
pub cache_data_block_miss: u64,
Expand Down Expand Up @@ -68,23 +60,14 @@ pub struct StoreLocalStatistic {

impl StoreLocalStatistic {
pub fn add(&mut self, other: &StoreLocalStatistic) {
self.cache_meta_block_miss += other.cache_meta_block_miss;
self.cache_meta_block_total += other.cache_meta_block_total;

self.cache_data_block_miss += other.cache_data_block_miss;
self.cache_data_block_total += other.cache_data_block_total;

self.skip_multi_version_key_count += other.skip_multi_version_key_count;
self.skip_delete_key_count += other.skip_delete_key_count;
self.processed_key_count += other.processed_key_count;
self.add_count(other);
self.add_histogram(other);
self.bloom_filter_true_negative_counts += other.bloom_filter_true_negative_counts;
self.remote_io_time.fetch_add(
other.remote_io_time.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.bloom_filter_check_counts += other.bloom_filter_check_counts;
self.total_key_count += other.total_key_count;
self.get_shared_buffer_hit_counts += other.get_shared_buffer_hit_counts;

#[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
if other.added.fetch_or(true, Ordering::Relaxed) || other.reported.load(Ordering::Relaxed) {
Expand All @@ -98,38 +81,12 @@ impl StoreLocalStatistic {
}

fn report(&self, metrics: &mut LocalStoreMetrics) {
inc_local_metrics!(
self,
metrics,
cache_data_block_total,
cache_data_block_miss,
cache_meta_block_total,
cache_meta_block_miss,
skip_multi_version_key_count,
skip_delete_key_count,
get_shared_buffer_hit_counts,
total_key_count,
processed_key_count
);
metrics.add_count(self);
metrics.add_histogram(self);
let t = self.remote_io_time.load(Ordering::Relaxed) as f64;
if t > 0.0 {
metrics.remote_io_time.observe(t / 1000.0);
}
metrics
.staging_imm_iter_count
.observe(self.staging_imm_iter_count as f64);
metrics
.staging_sst_iter_count
.observe(self.staging_sst_iter_count as f64);
metrics
.overlapping_iter_count
.observe(self.overlapping_iter_count as f64);
metrics
.non_overlapping_iter_count
.observe(self.non_overlapping_iter_count as f64);
metrics
.may_exist_check_sstable_count
.observe(self.may_exist_check_sstable_count as f64);

metrics.collect_count += 1;
if metrics.collect_count > FLUSH_LOCAL_METRICS_TIMES {
Expand Down Expand Up @@ -186,8 +143,9 @@ impl StoreLocalStatistic {
return;
}
// checks SST bloom filters
inc_local_metrics!(self, metrics, bloom_filter_true_negative_counts);

metrics
.bloom_filter_true_negative_counts
.inc_by(self.bloom_filter_true_negative_counts);
metrics.read_req_check_bloom_filter_counts.inc();

if self.bloom_filter_check_counts > self.bloom_filter_true_negative_counts {
Expand Down Expand Up @@ -262,6 +220,7 @@ struct LocalStoreMetrics {
overlapping_iter_count: LocalHistogram,
non_overlapping_iter_count: LocalHistogram,
may_exist_check_sstable_count: LocalHistogram,
sub_iter_count: LocalHistogram,
iter_filter_metrics: BloomFilterLocalMetrics,
get_filter_metrics: BloomFilterLocalMetrics,
may_exist_filter_metrics: BloomFilterLocalMetrics,
Expand Down Expand Up @@ -342,6 +301,10 @@ impl LocalStoreMetrics {
.iter_merge_sstable_counts
.with_label_values(&[table_id_label, "may-exist-check-sstable"])
.local();
let sub_iter_count = metrics
.iter_merge_sstable_counts
.with_label_values(&[table_id_label, "sub-iter"])
.local();
let get_filter_metrics = BloomFilterLocalMetrics::new(metrics, table_id_label, "get");
let iter_filter_metrics = BloomFilterLocalMetrics::new(metrics, table_id_label, "iter");
let may_exist_filter_metrics =
Expand All @@ -360,6 +323,7 @@ impl LocalStoreMetrics {
staging_imm_iter_count,
staging_sst_iter_count,
overlapping_iter_count,
sub_iter_count,
non_overlapping_iter_count,
may_exist_check_sstable_count,
get_filter_metrics,
Expand All @@ -370,21 +334,87 @@ impl LocalStoreMetrics {
}

pub fn flush(&mut self) {
self.cache_data_block_total.flush();
self.cache_data_block_miss.flush();
self.cache_meta_block_total.flush();
self.cache_meta_block_miss.flush();
self.remote_io_time.flush();
self.skip_multi_version_key_count.flush();
self.skip_delete_key_count.flush();
self.get_shared_buffer_hit_counts.flush();
self.total_key_count.flush();
self.processed_key_count.flush();
self.iter_filter_metrics.flush();
self.get_filter_metrics.flush();
self.flush_histogram();
self.flush_count();
}
}

macro_rules! add_local_metrics_histogram {
($($x:ident),*) => (
impl LocalStoreMetrics {
fn add_histogram(&self, stats: &StoreLocalStatistic) {
$(
self.$x.observe(stats.$x as f64);
)*
}

fn flush_histogram(&mut self) {
$(
self.$x.flush();
)*
}
}

impl StoreLocalStatistic {
fn add_histogram(&mut self, other: &StoreLocalStatistic) {
$(
self.$x += other.$x;
)*
}
}
)
}

add_local_metrics_histogram!(
staging_imm_iter_count,
staging_sst_iter_count,
overlapping_iter_count,
non_overlapping_iter_count,
sub_iter_count,
may_exist_check_sstable_count
);

macro_rules! add_local_metrics_count {
($($x:ident),*) => (
impl LocalStoreMetrics {
fn add_count(&self, stats: &StoreLocalStatistic) {
$(
self.$x.inc_by(stats.$x);
)*
}

fn flush_count(&mut self) {
$(
self.$x.flush();
)*
}
}

impl StoreLocalStatistic {
fn add_count(&mut self, other: &StoreLocalStatistic) {
$(
self.$x += other.$x;
)*
}
}
)
}

add_local_metrics_count!(
cache_data_block_total,
cache_data_block_miss,
cache_meta_block_total,
cache_meta_block_miss,
skip_multi_version_key_count,
skip_delete_key_count,
get_shared_buffer_hit_counts,
total_key_count,
processed_key_count
);

macro_rules! define_bloom_filter_metrics {
($($x:ident),*) => (
struct BloomFilterLocalMetrics {
Expand Down