Skip to content

Commit

Permalink
feat(metrics): add metrics for barrier latency at each stage (risingw…
Browse files Browse the repository at this point in the history
…avelabs#3965)

* add metrics

* remove

* add doc

* add docs

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and nasnoisaac committed Aug 9, 2022
1 parent 2094f0b commit 7ab90e9
Show file tree
Hide file tree
Showing 15 changed files with 180 additions and 60 deletions.
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions grafana/risingwave-dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,34 @@ def section_streaming(panels):
"rate(meta_barrier_duration_seconds_sum[$__rate_interval]) / rate(meta_barrier_duration_seconds_count[$__rate_interval])", "barrier_latency_avg"
),
]),
panels.timeseries_latency(
"Barrier In-Flight Latency",
quantile(lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate(stream_barrier_inflight_duration_seconds_bucket[$__rate_interval])) by (le))", f"barrier_inflight_latency_p{legend}"
), [50, 90, 99, 999, "max"]) + [
panels.target(
"max(sum by(le, instance)(rate(stream_barrier_inflight_duration_seconds_sum[$__rate_interval])) / sum by(le, instance)(rate(stream_barrier_inflight_duration_seconds_count[$__rate_interval])))", "barrier_inflight_latency_avg"
),
]),
panels.timeseries_latency(
"Barrier Sync Latency",

quantile(lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate(stream_barrier_sync_storage_duration_seconds_bucket[$__rate_interval])) by (le,instance))", f"barrier_sync_latency_p{legend}"+" - computer @ {{instance}}"
), [50, 90, 99, 999, "max"]) + [
panels.target(
"sum by(le, instance)(rate(stream_barrier_sync_storage_duration_seconds_sum[$__rate_interval])) / sum by(le, instance)(rate(stream_barrier_sync_storage_duration_seconds_count[$__rate_interval]))", "barrier_sync_latency_avg - computer @ {{instance}}"
),
]),
panels.timeseries_latency(
"Barrier Wait Commit Latency",
quantile(lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate(meta_barrier_wait_commit_duration_seconds_bucket[$__rate_interval])) by (le))", f"barrier_wait_commit_latency_p{legend}"
), [50, 90, 99, 999, "max"]) + [
panels.target(
"rate(meta_barrier_wait_commit_duration_seconds_sum[$__rate_interval]) / rate(meta_barrier_wait_commit_duration_seconds_count[$__rate_interval])", "barrier_wait_commit_avg"
),
]),
panels.timeseries_rowsps("Source Throughput", [
panels.target(
"rate(stream_source_output_rows_counts[$__rate_interval])", "source={{source_id}} @ {{instance}}"
Expand Down Expand Up @@ -790,6 +818,11 @@ def section_hummock(panels):
"sum(rate(state_store_shared_buffer_to_sstable_size_sum[$__rate_interval]))by(job,instance) / sum(rate(state_store_shared_buffer_to_sstable_size_count[$__rate_interval]))by(job,instance)", "sync - {{job}} @ {{instance}}"
),
]),
panels.timeseries_bytes("Checkpoint Sync Size", [
panels.target(
"sum by(le, job, instance) (rate(state_store_write_l0_size_per_epoch_sum[$__rate_interval])) / sum by(le, job, instance) (rate(state_store_write_l0_size_per_epoch_count[$__rate_interval]))", "avg - {{job}} @ {{instance}}"
),
]),
panels.timeseries_bytes("Cache Size", [
panels.target(
"avg(state_store_meta_cache_size) by (job,instance)", "meta cache - {{job}} @ {{instance}}"
Expand Down
47 changes: 26 additions & 21 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,19 +222,22 @@ struct CheckpointControl<S: MetaStore> {
adding_actors: HashSet<ActorId>,
/// The barrier does not send or collect these actors, even if they are `Running`.
removing_actors: HashSet<ActorId>,

metrics: Arc<MetaMetrics>,
}

impl<S> CheckpointControl<S>
where
S: MetaStore,
{
fn new() -> Self {
fn new(metrics: Arc<MetaMetrics>) -> Self {
Self {
command_ctx_queue: Default::default(),
creating_tables: Default::default(),
dropping_tables: Default::default(),
adding_actors: Default::default(),
removing_actors: Default::default(),
metrics,
}
}

Expand Down Expand Up @@ -315,26 +318,25 @@ where
}
}

/// Return the nums of barrier (the nums of in-flight-barrier, the nums of all-barrier).
fn get_barrier_len(&self) -> (usize, usize) {
(
/// Update the metrics of barrier nums.
fn update_barrier_nums_metrics(&self) {
self.metrics.in_flight_barrier_nums.set(
self.command_ctx_queue
.iter()
.filter(|x| matches!(x.state, InFlight))
.count(),
self.command_ctx_queue.len(),
)
.count() as i64,
);
self.metrics
.all_barrier_nums
.set(self.command_ctx_queue.len() as i64);
}

/// Inject a `command_ctx` in `command_ctx_queue`, and its state is `InFlight`.
fn inject(
&mut self,
command_ctx: Arc<CommandContext<S>>,
notifiers: SmallVec<[Notifier; 1]>,
timer: HistogramTimer,
) {
fn inject(&mut self, command_ctx: Arc<CommandContext<S>>, notifiers: SmallVec<[Notifier; 1]>) {
let timer = self.metrics.barrier_latency.start_timer();
self.command_ctx_queue.push_back(EpochNode {
timer: Some(timer),
wait_commit_timer: None,
state: InFlight,
command_ctx,
notifiers,
Expand All @@ -349,12 +351,14 @@ where
result: Result<Vec<BarrierCompleteResponse>>,
) -> Vec<EpochNode<S>> {
// change state to complete, and wait for nodes with the smaller epoch to commit
let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer();
if let Some(node) = self
.command_ctx_queue
.iter_mut()
.find(|x| x.command_ctx.prev_epoch.0 == prev_epoch)
{
assert!(matches!(node.state, InFlight));
node.wait_commit_timer = Some(wait_commit_timer);
node.state = Completed(result);
};
// Find all continuous nodes with 'Complete' starting from first node
Expand Down Expand Up @@ -430,6 +434,8 @@ where
pub struct EpochNode<S: MetaStore> {
/// Timer for recording barrier latency, taken after `complete_barriers`.
timer: Option<HistogramTimer>,
/// The timer of `barrier_wait_commit_latency`
wait_commit_timer: Option<HistogramTimer>,
/// Whether this barrier is in-flight or completed.
state: BarrierEpochState,
/// Context of this command to generate barrier and do some post jobs.
Expand Down Expand Up @@ -536,7 +542,7 @@ where
min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut barrier_timer: Option<HistogramTimer> = None;
let (barrier_complete_tx, mut barrier_complete_rx) = tokio::sync::mpsc::unbounded_channel();
let mut checkpoint_control = CheckpointControl::new();
let mut checkpoint_control = CheckpointControl::new(self.metrics.clone());
loop {
tokio::select! {
biased;
Expand All @@ -546,11 +552,7 @@ where
return;
}
result = barrier_complete_rx.recv() => {
let (in_flight_nums, all_nums) = checkpoint_control.get_barrier_len();
self.metrics
.in_flight_barrier_nums
.set(in_flight_nums as i64);
self.metrics.all_barrier_nums.set(all_nums as i64);
checkpoint_control.update_barrier_nums_metrics();

let (prev_epoch, result) = result.unwrap();
self.barrier_complete_and_commit(
Expand Down Expand Up @@ -610,8 +612,7 @@ where
));
let mut notifiers = notifiers;
notifiers.iter_mut().for_each(Notifier::notify_to_send);
let timer = self.metrics.barrier_latency.start_timer();
checkpoint_control.inject(command_ctx.clone(), notifiers, timer);
checkpoint_control.inject(command_ctx.clone(), notifiers);

self.inject_and_send_err(command_ctx, barrier_complete_tx.clone())
.await;
Expand Down Expand Up @@ -764,6 +765,9 @@ where
if let Some(timer) = node.timer {
timer.observe_duration();
}
if let Some(wait_commit_timer) = node.wait_commit_timer {
wait_commit_timer.observe_duration();
}
node.notifiers
.into_iter()
.for_each(|notifier| notifier.notify_collection_failed(err.clone()));
Expand Down Expand Up @@ -831,6 +835,7 @@ where
}

node.timer.take().unwrap().observe_duration();
node.wait_commit_timer.take().unwrap().observe_duration();
node.command_ctx.post_collect().await?;

// Notify about collected first.
Expand Down
7 changes: 5 additions & 2 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,11 @@ where
}

async fn resolve_actor_info_for_recovery(&self) -> BarrierActorInfo {
self.resolve_actor_info(&mut CheckpointControl::new(), &Command::checkpoint())
.await
self.resolve_actor_info(
&mut CheckpointControl::new(self.metrics.clone()),
&Command::checkpoint(),
)
.await
}

/// Recovery the whole cluster from the latest epoch.
Expand Down
14 changes: 13 additions & 1 deletion src/meta/src/rpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ pub struct MetaMetrics {

/// gRPC latency of meta services
pub grpc_latency: HistogramVec,
/// latency of each barrier
/// The duration from barrier injection to commit
/// It is the sum of inflight-latency , sync-latency and wait-commit-latency
pub barrier_latency: Histogram,
/// The duration from barrier complete to commit
pub barrier_wait_commit_latency: Histogram,

/// latency between each barrier send
pub barrier_send_latency: Histogram,
Expand Down Expand Up @@ -72,6 +75,14 @@ impl MetaMetrics {
);
let barrier_latency = register_histogram_with_registry!(opts, registry).unwrap();

let opts = histogram_opts!(
"meta_barrier_wait_commit_duration_seconds",
"barrier_wait_commit_latency",
exponential_buckets(0.1, 1.5, 16).unwrap() // max 43s
);
let barrier_wait_commit_latency =
register_histogram_with_registry!(opts, registry).unwrap();

let opts = histogram_opts!(
"meta_barrier_send_duration_seconds",
"barrier send latency",
Expand Down Expand Up @@ -153,6 +164,7 @@ impl MetaMetrics {

grpc_latency,
barrier_latency,
barrier_wait_commit_latency,
barrier_send_latency,
all_barrier_nums,
in_flight_barrier_nums,
Expand Down
18 changes: 9 additions & 9 deletions src/storage/src/hummock/local_version_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ impl LocalVersionManager {
Some((epoch, join_handle))
}

pub async fn sync_shared_buffer(&self, epoch: Option<HummockEpoch>) -> HummockResult<()> {
pub async fn sync_shared_buffer(&self, epoch: Option<HummockEpoch>) -> HummockResult<usize> {
let epochs = match epoch {
Some(epoch) => vec![epoch],
None => self
Expand All @@ -479,13 +479,14 @@ impl LocalVersionManager {
.map(|(epoch, _)| *epoch)
.collect(),
};
let mut size = 0;
for epoch in epochs {
self.sync_shared_buffer_epoch(epoch).await?;
size += self.sync_shared_buffer_epoch(epoch).await?
}
Ok(())
Ok(size)
}

pub async fn sync_shared_buffer_epoch(&self, epoch: HummockEpoch) -> HummockResult<()> {
pub async fn sync_shared_buffer_epoch(&self, epoch: HummockEpoch) -> HummockResult<usize> {
tracing::trace!("sync epoch {}", epoch);
let (tx, rx) = oneshot::channel();
self.buffer_tracker
Expand All @@ -503,13 +504,12 @@ impl LocalVersionManager {
Some(task) => task,
None => {
tracing::trace!("sync epoch {} has no more task to do", epoch);
return Ok(());
return Ok(0);
}
};

let ret = self
.run_upload_task(order_index, epoch, task_payload, false)
.await;
self.run_upload_task(order_index, epoch, task_payload, false)
.await?;
tracing::trace!(
"sync epoch {} finished. Task size {}",
epoch,
Expand All @@ -520,7 +520,7 @@ impl LocalVersionManager {
}
self.buffer_tracker
.send_event(SharedBufferEvent::EpochSynced(epoch));
ret
Ok(task_write_batch_size)
}

async fn run_upload_task(
Expand Down
5 changes: 3 additions & 2 deletions src/storage/src/hummock/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,11 @@ impl StateStore for HummockStorage {

fn sync(&self, epoch: Option<u64>) -> Self::SyncFuture<'_> {
async move {
self.local_version_manager()
let size = self
.local_version_manager()
.sync_shared_buffer(epoch)
.await?;
Ok(())
Ok(size)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl StateStore for MemoryStateStore {
fn sync(&self, _epoch: Option<u64>) -> Self::SyncFuture<'_> {
async move {
// memory backend doesn't support push to S3, so this is a no-op
Ok(())
Ok(0)
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/storage/src/monitor/monitored_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,16 @@ where
fn sync(&self, epoch: Option<u64>) -> Self::SyncFuture<'_> {
async move {
let timer = self.stats.shared_buffer_to_l0_duration.start_timer();
self.inner
let size = self
.inner
.sync(epoch)
.await
.inspect_err(|e| error!("Failed in sync: {:?}", e))?;
timer.observe_duration();
Ok(())
if size != 0 {
self.stats.write_l0_size_per_epoch.observe(size as _);
}
Ok(size)
}
}

Expand Down
10 changes: 10 additions & 0 deletions src/storage/src/monitor/state_store_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ macro_rules! for_all_metrics {
write_batch_size: Histogram,
write_build_l0_sst_duration: Histogram,
write_build_l0_bytes: GenericCounter<AtomicU64>,
write_l0_size_per_epoch: Histogram,

iter_merge_sstable_counts: HistogramVec,

Expand Down Expand Up @@ -248,6 +249,14 @@ impl StateStoreMetrics {
"Total size of compaction files size that have been written to object store from shared buffer",
registry
).unwrap();

let opts = histogram_opts!(
"state_store_write_l0_size_per_epoch",
"Total size of upload to l0 every epoch",
exponential_buckets(10.0, 2.0, 25).unwrap()
);
let write_l0_size_per_epoch = register_histogram_with_registry!(opts, registry).unwrap();

let opts = histogram_opts!(
"state_store_shared_buffer_to_l0_duration",
"Histogram of time spent from compacting shared buffer to remote storage",
Expand Down Expand Up @@ -401,6 +410,7 @@ impl StateStoreMetrics {
write_batch_size,
write_build_l0_sst_duration,
write_build_l0_bytes,
write_l0_size_per_epoch,
iter_merge_sstable_counts,
sst_store_block_request_counts,
shared_buffer_to_l0_duration,
Expand Down
5 changes: 3 additions & 2 deletions src/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::write_batch::WriteBatch;
pub trait GetFutureTrait<'a> = Future<Output = StorageResult<Option<Bytes>>> + Send;
pub trait ScanFutureTrait<'a, R, B> = Future<Output = StorageResult<Vec<(Bytes, Bytes)>>> + Send;
pub trait EmptyFutureTrait<'a> = Future<Output = StorageResult<()>> + Send;
pub trait SyncFutureTrait<'a> = Future<Output = StorageResult<usize>> + Send;
pub trait IngestBatchFutureTrait<'a> = Future<Output = StorageResult<usize>> + Send;

#[macro_export]
Expand All @@ -46,7 +47,7 @@ macro_rules! define_state_store_associated_type {
type IngestBatchFuture<'a> = impl IngestBatchFutureTrait<'a>;
type ReplicateBatchFuture<'a> = impl EmptyFutureTrait<'a>;
type WaitEpochFuture<'a> = impl EmptyFutureTrait<'a>;
type SyncFuture<'a> = impl EmptyFutureTrait<'a>;
type SyncFuture<'a> = impl SyncFutureTrait<'a>;
type IterFuture<'a, R, B> = impl Future<Output = $crate::error::StorageResult<Self::Iter>> + Send
where
R: 'static + Send + RangeBounds<B>,
Expand Down Expand Up @@ -80,7 +81,7 @@ pub trait StateStore: Send + Sync + 'static + Clone {

type WaitEpochFuture<'a>: EmptyFutureTrait<'a>;

type SyncFuture<'a>: EmptyFutureTrait<'a>;
type SyncFuture<'a>: SyncFutureTrait<'a>;

type IterFuture<'a, R, B>: Future<Output = StorageResult<Self::Iter>> + Send
where
Expand Down
Loading

0 comments on commit 7ab90e9

Please sign in to comment.