Skip to content

Commit

Permalink
Record perf contexts to the write procedure
Browse files Browse the repository at this point in the history
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
  • Loading branch information
sticnarf committed Jun 23, 2022
1 parent a950ec5 commit 5044f50
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 27 deletions.
23 changes: 16 additions & 7 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::metrics::*;
use crate::pipe_log::{FileBlockHandle, FileId, LogQueue, PipeLog};
use crate::purge::{PurgeHook, PurgeManager};
use crate::write_barrier::{WriteBarrier, Writer};
use crate::{Error, GlobalStats, Result};
use crate::{perf_context, Error, GlobalStats, Result};

const METRICS_FLUSH_INTERVAL: Duration = Duration::from_secs(30);

Expand Down Expand Up @@ -143,8 +143,10 @@ where
let block_handle = {
let mut writer = Writer::new(log_batch, sync, start);
if let Some(mut group) = self.write_barrier.enter(&mut writer) {
let init_perf_context = get_perf_context().with(|pc| pc.take());
let now = Instant::now();
let _t = StopWatch::new_with(&ENGINE_WRITE_LEADER_DURATION_HISTOGRAM, now);
let _t = StopWatch::new_with(&*ENGINE_WRITE_LEADER_DURATION_HISTOGRAM, now);
let append_watch = StopWatch::new_with(perf_context!(log_write_nanos), now);
for writer in group.iter_mut() {
ENGINE_WRITE_PREPROCESS_DURATION_HISTOGRAM.observe(
now.saturating_duration_since(writer.start_time)
Expand All @@ -166,14 +168,20 @@ where
};
writer.set_output(res);
}
drop(append_watch);
if let Err(e) = self.pipe_log.maybe_sync(LogQueue::Append, sync) {
panic!(
"Cannot sync {:?} queue due to IO error: {}",
LogQueue::Append,
e
);
}
let diff = get_perf_context().with(|pc| pc.replace(init_perf_context));
for writer in group.iter_mut() {
writer.perf_context_diff = diff.clone();
}
}
get_perf_context().with(|pc| *pc.borrow_mut() += &writer.perf_context_diff);
writer.finish()?
};

Expand All @@ -185,8 +193,9 @@ where
listener.post_apply_memtables(block_handle.id);
}
let end = Instant::now();
ENGINE_WRITE_APPLY_DURATION_HISTOGRAM
.observe(end.saturating_duration_since(now).as_secs_f64());
let apply_duration = end.saturating_duration_since(now);
ENGINE_WRITE_APPLY_DURATION_HISTOGRAM.observe(apply_duration.as_secs_f64());
perf_context!(write_apply_nanos).observe(apply_duration);
now = end;
}
ENGINE_WRITE_DURATION_HISTOGRAM.observe(now.saturating_duration_since(start).as_secs_f64());
Expand All @@ -201,7 +210,7 @@ where
}

pub fn get_message<S: Message>(&self, region_id: u64, key: &[u8]) -> Result<Option<S>> {
let _t = StopWatch::new(&ENGINE_READ_MESSAGE_DURATION_HISTOGRAM);
let _t = StopWatch::new(&*ENGINE_READ_MESSAGE_DURATION_HISTOGRAM);
if let Some(memtable) = self.memtables.get(region_id) {
if let Some(value) = memtable.read().get(key) {
return Ok(Some(parse_from_bytes(&value)?));
Expand All @@ -215,7 +224,7 @@ where
region_id: u64,
log_idx: u64,
) -> Result<Option<M::Entry>> {
let _t = StopWatch::new(&ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
if let Some(memtable) = self.memtables.get(region_id) {
if let Some(idx) = memtable.read().get_entry(log_idx) {
ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(1.0);
Expand Down Expand Up @@ -243,7 +252,7 @@ where
max_size: Option<usize>,
vec: &mut Vec<M::Entry>,
) -> Result<usize> {
let _t = StopWatch::new(&ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
if let Some(memtable) = self.memtables.get(region_id) {
let mut ents_idx: Vec<EntryIndex> = Vec::with_capacity((end - begin) as usize);
memtable
Expand Down
9 changes: 6 additions & 3 deletions src/file_pipe_log/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use log::warn;
use crate::env::{FileSystem, Handle, WriteExt};
use crate::metrics::*;
use crate::pipe_log::FileBlockHandle;
use crate::{Error, Result};
use crate::{perf_context, Error, Result};

use super::format::{LogFileFormat, Version};

Expand Down Expand Up @@ -91,7 +91,7 @@ impl<F: FileSystem> LogFileWriter<F> {
pub fn write(&mut self, buf: &[u8], target_size_hint: usize) -> Result<()> {
let new_written = self.written + buf.len();
if self.capacity < new_written {
let _t = StopWatch::new(&LOG_ALLOCATE_DURATION_HISTOGRAM);
let _t = StopWatch::new(&*LOG_ALLOCATE_DURATION_HISTOGRAM);
let alloc = std::cmp::max(
new_written - self.capacity,
std::cmp::min(
Expand All @@ -111,7 +111,10 @@ impl<F: FileSystem> LogFileWriter<F> {

pub fn sync(&mut self) -> Result<()> {
if self.last_sync < self.written {
let _t = StopWatch::new(&LOG_SYNC_DURATION_HISTOGRAM);
let _t = StopWatch::new((
&*LOG_SYNC_DURATION_HISTOGRAM,
perf_context!(log_rotate_nanos),
));
self.writer.sync()?;
self.last_sync = self.written;
}
Expand Down
7 changes: 5 additions & 2 deletions src/file_pipe_log/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::env::FileSystem;
use crate::event_listener::EventListener;
use crate::metrics::*;
use crate::pipe_log::{FileBlockHandle, FileId, FileSeq, LogQueue, PipeLog};
use crate::{Error, Result};
use crate::{perf_context, Error, Result};

use super::format::{FileNameExt, Version};
use super::log_file::{build_file_reader, build_file_writer, FileHandler, LogFileWriter};
Expand Down Expand Up @@ -154,7 +154,10 @@ impl<F: FileSystem> SinglePipe<F> {
///
/// This operation is atomic in face of errors.
fn rotate_imp(&self, active_file: &mut MutexGuard<ActiveFile<F>>) -> Result<()> {
let _t = StopWatch::new(&LOG_ROTATE_DURATION_HISTOGRAM);
let _t = StopWatch::new((
&*LOG_ROTATE_DURATION_HISTOGRAM,
perf_context!(log_rotate_nanos),
));
let seq = active_file.seq + 1;
debug_assert!(seq > 1);

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub use config::{Config, RecoveryMode};
pub use engine::Engine;
pub use errors::{Error, Result};
pub use log_batch::{Command, LogBatch, MessageExt};
pub use metrics::{get_perf_context, reset_perf_context, PerfContext};
pub use util::ReadableSize;

#[cfg(feature = "internals")]
Expand Down
4 changes: 3 additions & 1 deletion src/log_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use protobuf::Message;
use crate::codec::{self, NumberEncoder};
use crate::file_pipe_log::Version;
use crate::memtable::EntryIndex;
use crate::metrics::StopWatch;
use crate::pipe_log::{FileBlockHandle, FileId};
use crate::util::{crc32, lz4};
use crate::{Error, Result};
use crate::{perf_context, Error, Result};

pub(crate) const LOG_BATCH_HEADER_LEN: usize = 16;
pub(crate) const LOG_BATCH_CHECKSUM_LEN: usize = 4;
Expand Down Expand Up @@ -697,6 +698,7 @@ impl LogBatch {
/// Internally, encodes and optionally compresses log entries. Sets the
/// compression type to each entry index.
pub(crate) fn finish_populate(&mut self, compression_threshold: usize) -> Result<usize> {
let _t = StopWatch::new(perf_context!(log_populating_nanos));
debug_assert!(self.buf_state == BufState::Open);
if self.is_empty() {
self.buf_state = BufState::Sealed(self.buf.len(), 0);
Expand Down
126 changes: 115 additions & 11 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,140 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.

use std::time::Instant;
use std::{
cell::{RefCell, RefMut},
ops::AddAssign,
thread::LocalKey,
time::{Duration, Instant},
};

use prometheus::*;
use prometheus_static_metric::*;

use crate::util::InstantExt;

pub struct StopWatch<'a> {
histogram: &'a Histogram,
pub struct StopWatch<M: TimeMetric> {
metric: M,
start: Instant,
}

impl<'a> StopWatch<'a> {
impl<M: TimeMetric> StopWatch<M> {
#[inline]
pub fn new(histogram: &'a Histogram) -> Self {
pub fn new(metric: M) -> Self {
Self {
histogram,
metric,
start: Instant::now(),
}
}

#[inline]
pub fn new_with(histogram: &'a Histogram, start: Instant) -> Self {
Self { histogram, start }
pub fn new_with(metric: M, start: Instant) -> Self {
Self { metric, start }
}
}

impl<'a> Drop for StopWatch<'a> {
impl<M: TimeMetric> Drop for StopWatch<M> {
fn drop(&mut self) {
self.histogram
.observe(self.start.saturating_elapsed().as_secs_f64());
self.metric.observe(self.start.saturating_elapsed());
}
}

/// PerfContext records cumulative performance statistics of operations.
///
/// Raft Engine will update the data in the thread-local PerfContext whenever
/// an opeartion is performed.
#[derive(Debug, Clone, Default)]
pub struct PerfContext {
/// Time spent encoding and compressing log entries.
pub log_populating_nanos: u64,

/// Time spent waiting for becoming the write leader.
pub write_leader_wait_nanos: u64,

/// Time spent writing the logs to files.
pub log_write_nanos: u64,

/// Time spent rotating the active log file.
pub log_rotate_nanos: u64,

// Time spent synchronizing logs to the disk.
pub log_sync_nanos: u64,

// Time spent applying the appended logs.
pub write_apply_nanos: u64,
}

impl AddAssign<&'_ PerfContext> for PerfContext {
fn add_assign(&mut self, rhs: &PerfContext) {
self.log_populating_nanos += rhs.log_populating_nanos;
self.write_leader_wait_nanos += rhs.write_leader_wait_nanos;
self.log_write_nanos += rhs.log_write_nanos;
self.log_rotate_nanos += rhs.log_rotate_nanos;
self.log_sync_nanos += rhs.log_sync_nanos;
self.write_apply_nanos += rhs.write_apply_nanos;
}
}

thread_local! {
static TLS_PERF_CONTEXT: RefCell<PerfContext> = RefCell::new(PerfContext::default());
}

pub fn reset_perf_context() {
TLS_PERF_CONTEXT.with(|c| *c.borrow_mut() = PerfContext::default());
}

pub fn get_perf_context() -> &'static LocalKey<RefCell<PerfContext>> {
&TLS_PERF_CONTEXT
}

pub(crate) struct PerfContextField<P> {
projector: P,
}

impl<P> PerfContextField<P>
where
P: Fn(&mut PerfContext) -> &mut u64,
{
pub fn new(projector: P) -> Self {
PerfContextField { projector }
}
}

#[macro_export(crate)]
macro_rules! perf_context {
($field: ident) => {
$crate::metrics::PerfContextField::new(|perf_context| &mut perf_context.$field)
};
}

pub trait TimeMetric {
fn observe(&self, duration: Duration);
}

impl<'a> TimeMetric for &'a Histogram {
fn observe(&self, duration: Duration) {
Histogram::observe(self, duration.as_secs_f64());
}
}

impl<P> TimeMetric for PerfContextField<P>
where
P: Fn(&mut PerfContext) -> &mut u64,
{
fn observe(&self, duration: Duration) {
TLS_PERF_CONTEXT.with(|perf_context| {
*RefMut::map(perf_context.borrow_mut(), &self.projector) += duration.as_nanos() as u64;
})
}
}

impl<M1, M2> TimeMetric for (M1, M2)
where
M1: TimeMetric,
M2: TimeMetric,
{
fn observe(&self, duration: Duration) {
self.0.observe(duration);
self.1.observe(duration);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ where
}

pub fn purge_expired_files(&self) -> Result<Vec<u64>> {
let _t = StopWatch::new(&ENGINE_PURGE_DURATION_HISTOGRAM);
let _t = StopWatch::new(&*ENGINE_PURGE_DURATION_HISTOGRAM);
let guard = self.force_rewrite_candidates.try_lock();
if guard.is_none() {
warn!("Unable to purge expired files: locked");
Expand Down Expand Up @@ -202,7 +202,7 @@ where
compact_watermark: FileSeq,
rewrite_candidates: &mut HashMap<u64, u32>,
) -> Result<Vec<u64>> {
let _t = StopWatch::new(&ENGINE_REWRITE_APPEND_DURATION_HISTOGRAM);
let _t = StopWatch::new(&*ENGINE_REWRITE_APPEND_DURATION_HISTOGRAM);
debug_assert!(compact_watermark <= rewrite_watermark);
let mut should_compact = Vec::with_capacity(16);

Expand Down Expand Up @@ -239,7 +239,7 @@ where

// Rewrites the entire rewrite queue into new log files.
fn rewrite_rewrite_queue(&self) -> Result<Vec<u64>> {
let _t = StopWatch::new(&ENGINE_REWRITE_REWRITE_DURATION_HISTOGRAM);
let _t = StopWatch::new(&*ENGINE_REWRITE_REWRITE_DURATION_HISTOGRAM);
self.pipe_log.rotate(LogQueue::Rewrite)?;

let mut force_compact_regions = vec![];
Expand Down
5 changes: 5 additions & 0 deletions src/write_barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use std::time::Instant;
use fail::fail_point;
use parking_lot::{Condvar, Mutex};

use crate::{metrics::StopWatch, perf_context, PerfContext};

type Ptr<T> = Option<NonNull<T>>;

///
Expand All @@ -23,6 +25,7 @@ pub struct Writer<P, O> {

pub(crate) sync: bool,
pub(crate) start_time: Instant,
pub(crate) perf_context_diff: PerfContext,
}

impl<P, O> Writer<P, O> {
Expand All @@ -39,6 +42,7 @@ impl<P, O> Writer<P, O> {
output: None,
sync,
start_time,
perf_context_diff: PerfContext::default(),
}
}

Expand Down Expand Up @@ -165,6 +169,7 @@ impl<P, O> WriteBarrier<P, O> {
/// the leader of a set of writers, returns a [`WriteGroup`] that contains
/// them, `writer` included.
pub fn enter<'a>(&self, writer: &'a mut Writer<P, O>) -> Option<WriteGroup<'_, 'a, P, O>> {
let _t = StopWatch::new(perf_context!(write_leader_wait_nanos));
let node = unsafe { Some(NonNull::new_unchecked(writer)) };
let mut inner = self.inner.lock();
if let Some(tail) = inner.tail.get() {
Expand Down

0 comments on commit 5044f50

Please sign in to comment.