Skip to content

Commit

Permalink
feat(storage): monitor avg_key_size and avg_epoch_count (risingwavela…
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Mar 9, 2023
1 parent 953e4b2 commit f12b263
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 32 deletions.
13 changes: 12 additions & 1 deletion grafana/risingwave-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ def section_compaction(outer_panels):
"Total bytes gotten from sstable_avg_key_size, for observing sstable_avg_key_size",
[
panels.target(
f"sum by(le, job, instance)(rate({metric('compactor_sstable_avg_key_size_sum')}[$__rate_interval])) / sum by(le, job, instance)(rate({metric('state_store_sstable_avg_key_size_count')}[$__rate_interval]))",
f"sum by(le, job, instance)(rate({metric('compactor_sstable_avg_key_size_sum')}[$__rate_interval])) / sum by(le, job, instance)(rate({metric('compactor_sstable_avg_key_size_count')}[$__rate_interval]))",
"avg_key_size - {{job}} @ {{instance}}",
),
panels.target(
Expand All @@ -778,6 +778,17 @@ def section_compaction(outer_panels):
],
),

panels.timeseries_count(
"Hummock Sstable Stat",
"Avg count gotten from sstable_distinct_epoch_count, for observing sstable_distinct_epoch_count",
[
panels.target(
f"sum by(le, job, instance)(rate({metric('compactor_sstable_distinct_epoch_count_sum')}[$__rate_interval])) / sum by(le, job, instance)(rate({metric('compactor_sstable_distinct_epoch_count_count')}[$__rate_interval]))",
"avg_epoch_count - {{job}} @ {{instance}}",
),
],
),

panels.timeseries_latency(
"Hummock Remote Read Duration",
"Total time of operations which read from remote storage when enable prefetch",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

84 changes: 55 additions & 29 deletions src/storage/src/hummock/sstable/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub struct SstableBuilderOutput<WO> {
pub writer_output: WO,
pub avg_key_size: usize,
pub avg_value_size: usize,
pub epoch_count: usize,
}

pub struct SstableBuilder<W: SstableWriter, F: FilterBuilder> {
Expand Down Expand Up @@ -119,8 +120,7 @@ pub struct SstableBuilder<W: SstableWriter, F: FilterBuilder> {

filter_builder: F,

min_epoch: u64,
max_epoch: u64,
epoch_set: BTreeSet<u64>,
}

impl<W: SstableWriter> SstableBuilder<W, XorFilterBuilder> {
Expand Down Expand Up @@ -168,8 +168,7 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
total_key_count: 0,
table_stats: Default::default(),
last_table_stats: Default::default(),
min_epoch: u64::MAX,
max_epoch: u64::MIN,
epoch_set: BTreeSet::default(),
}
}

Expand Down Expand Up @@ -238,6 +237,8 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
self.total_key_count += 1;
self.last_table_stats.total_key_count += 1;

self.epoch_set.insert(full_key.epoch);

if is_new_table && !self.block_builder.is_empty() {
self.build_block().await?;
}
Expand All @@ -263,9 +264,6 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
self.raw_key.clear();
self.raw_value.clear();

self.min_epoch = cmp::min(self.min_epoch, full_key.epoch);
self.max_epoch = cmp::max(self.max_epoch, full_key.epoch);

if self.block_builder.approximate_len() >= self.options.block_capacity {
self.build_block().await?;
}
Expand Down Expand Up @@ -363,6 +361,48 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
(tombstone_min_epoch, tombstone_max_epoch)
};

let (avg_key_size, avg_value_size) = if self.table_stats.is_empty() {
(0, 0)
} else {
let total_key_count: usize = self
.table_stats
.values()
.map(|s| s.total_key_count as usize)
.sum();

if total_key_count == 0 {
(0, 0)
} else {
let total_key_size: usize = self
.table_stats
.values()
.map(|s| s.total_key_size as usize)
.sum();

let total_value_size: usize = self
.table_stats
.values()
.map(|s| s.total_value_size as usize)
.sum();

(
total_key_size / total_key_count,
total_value_size / total_key_count,
)
}
};

let (min_epoch, max_epoch) = {
if self.epoch_set.is_empty() {
(u64::MAX, u64::MIN)
} else {
(
*self.epoch_set.first().unwrap(),
*self.epoch_set.last().unwrap(),
)
}
};

let sst_info = SstableInfo {
id: self.sstable_id,
key_range: Some(risingwave_pb::hummock::KeyRange {
Expand All @@ -377,43 +417,29 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
total_key_count: self.total_key_count,
divide_version: 0,
uncompressed_file_size: uncompressed_file_size + meta.encoded_size() as u64,
min_epoch: cmp::min(self.min_epoch, tombstone_min_epoch),
max_epoch: cmp::max(self.max_epoch, tombstone_max_epoch),
min_epoch: cmp::min(min_epoch, tombstone_min_epoch),
max_epoch: cmp::max(max_epoch, tombstone_max_epoch),
};
tracing::trace!(
"meta_size {} bloom_filter_size {} add_key_counts {} stale_key_count {} min_epoch {} max_epoch {}",
"meta_size {} bloom_filter_size {} add_key_counts {} stale_key_count {} min_epoch {} max_epoch {} epoch_count {}",
meta.encoded_size(),
meta.bloom_filter.len(),
self.total_key_count,
self.stale_key_count,
self.min_epoch,
self.max_epoch,
min_epoch,
max_epoch,
self.epoch_set.len()
);
let bloom_filter_size = meta.bloom_filter.len();
let (avg_key_size, avg_value_size) = if self.table_stats.is_empty() {
(0, 0)
} else {
let avg_key_size = self
.table_stats
.values()
.map(|s| s.total_key_size as usize)
.sum::<usize>()
/ self.table_stats.len();
let avg_value_size = self
.table_stats
.values()
.map(|s| s.total_value_size as usize)
.sum::<usize>()
/ self.table_stats.len();
(avg_key_size, avg_value_size)
};

let writer_output = self.writer.finish(meta).await?;
Ok(SstableBuilderOutput::<W::Output> {
sst_info: LocalSstableInfo::with_stats(sst_info, self.table_stats),
bloom_filter_size,
writer_output,
avg_key_size,
avg_value_size,
epoch_count: self.epoch_set.len(),
})
}

Expand Down
6 changes: 6 additions & 0 deletions src/storage/src/hummock/sstable/multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ where
.sstable_avg_value_size
.observe(builder_output.avg_value_size as _);
}

if builder_output.epoch_count != 0 {
self.compactor_metrics
.sstable_distinct_epoch_count
.observe(builder_output.epoch_count as _);
}
}
self.sst_outputs.push(SplitTableOutput {
upload_join_handle: builder_output.writer_output,
Expand Down
13 changes: 12 additions & 1 deletion src/storage/src/monitor/compactor_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct CompactorMetrics {
pub sstable_avg_value_size: Histogram,
pub iter_scan_key_counts: GenericCounterVec<AtomicU64>,
pub write_build_l0_bytes: GenericCounter<AtomicU64>,
pub sstable_distinct_epoch_count: Histogram,
}

impl CompactorMetrics {
Expand Down Expand Up @@ -171,7 +172,7 @@ impl CompactorMetrics {
let opts = histogram_opts!(
"compactor_sstable_avg_value_size",
"Total bytes gotten from sstable_avg_value_size, for observing sstable_avg_value_size",
exponential_buckets(1.0, 2.0, 25).unwrap() // max 16MB
exponential_buckets(1.0, 2.0, 26).unwrap() // max 32MB
);

let sstable_avg_value_size = register_histogram_with_registry!(opts, registry).unwrap();
Expand All @@ -198,6 +199,15 @@ impl CompactorMetrics {
registry
).unwrap();

let opts = histogram_opts!(
"compactor_sstable_distinct_epoch_count",
"Total number gotten from sstable_distinct_epoch_count, for observing sstable_distinct_epoch_count",
exponential_buckets(1.0, 2.0, 17).unwrap()
);

let sstable_distinct_epoch_count =
register_histogram_with_registry!(opts, registry).unwrap();

Self {
compaction_upload_sst_counts,
compact_write_bytes,
Expand All @@ -219,6 +229,7 @@ impl CompactorMetrics {
sstable_avg_value_size,
iter_scan_key_counts,
write_build_l0_bytes,
sstable_distinct_epoch_count,
}
}

Expand Down

0 comments on commit f12b263

Please sign in to comment.