Skip to content

Commit

Permalink
metrics/cache: add more prefetch related metrics
Browse files Browse the repository at this point in the history
Record prefech request average latency.
Calculate prefetch average bandwidth.

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
  • Loading branch information
changweige committed Nov 24, 2022
1 parent bbcb0bf commit 4c48504
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 13 deletions.
58 changes: 45 additions & 13 deletions storage/src/cache/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
use std::io::Result;
use std::num::NonZeroU32;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Once};
use std::thread;
use std::time::Duration;
use std::time::{Duration, SystemTime};

use governor::clock::QuantaClock;
use governor::state::{InMemoryState, NotKeyed};
Expand All @@ -25,6 +25,8 @@ use crate::cache::{BlobCache, BlobIoRange};
use crate::factory::ASYNC_RUNTIME;
use crate::RAFS_MAX_CHUNK_SIZE;

static PREFETCH_BEGIN_TIMING: Once = Once::new();

/// Configuration information for asynchronous workers.
pub(crate) struct AsyncPrefetchConfig {
/// Whether or not to enable prefetch.
Expand All @@ -51,9 +53,9 @@ impl From<BlobPrefetchConfig> for AsyncPrefetchConfig {
/// Asynchronous service request message.
pub(crate) enum AsyncPrefetchMessage {
/// Asynchronous blob layer prefetch request with (offset, size) of blob on storage backend.
BlobPrefetch(Arc<dyn BlobCache>, u64, u64),
BlobPrefetch(Arc<dyn BlobCache>, u64, u64, SystemTime),
/// Asynchronous file-system layer prefetch request.
FsPrefetch(Arc<dyn BlobCache>, BlobIoRange),
FsPrefetch(Arc<dyn BlobCache>, BlobIoRange, SystemTime),
#[cfg_attr(not(test), allow(unused))]
/// Ping for test.
Ping,
Expand All @@ -64,12 +66,12 @@ pub(crate) enum AsyncPrefetchMessage {
impl AsyncPrefetchMessage {
/// Create a new asynchronous filesystem prefetch request message.
pub fn new_fs_prefetch(blob_cache: Arc<dyn BlobCache>, req: BlobIoRange) -> Self {
AsyncPrefetchMessage::FsPrefetch(blob_cache, req)
AsyncPrefetchMessage::FsPrefetch(blob_cache, req, SystemTime::now())
}

/// Create a new asynchronous blob prefetch request message.
pub fn new_blob_prefetch(blob_cache: Arc<dyn BlobCache>, offset: u64, size: u64) -> Self {
AsyncPrefetchMessage::BlobPrefetch(blob_cache, offset, size)
AsyncPrefetchMessage::BlobPrefetch(blob_cache, offset, size, SystemTime::now())
}
}

Expand Down Expand Up @@ -173,10 +175,10 @@ impl AsyncWorkerMgr {
pub fn flush_pending_prefetch_requests(&self, blob_id: &str) {
self.prefetch_channel
.flush_pending_prefetch_requests(|t| match t {
AsyncPrefetchMessage::BlobPrefetch(blob, _, _) => {
AsyncPrefetchMessage::BlobPrefetch(blob, _, _, _) => {
blob_id == blob.blob_id() && !blob.is_prefetch_active()
}
AsyncPrefetchMessage::FsPrefetch(blob, _) => {
AsyncPrefetchMessage::FsPrefetch(blob, _, _) => {
blob_id == blob.blob_id() && !blob.is_prefetch_active()
}
_ => false,
Expand Down Expand Up @@ -232,6 +234,17 @@ impl AsyncWorkerMgr {
}

async fn handle_prefetch_requests(mgr: Arc<AsyncWorkerMgr>, rt: &Runtime) {
PREFETCH_BEGIN_TIMING.call_once(|| {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();

mgr.metrics.prefetch_begin_time_secs.set(now.as_secs());
mgr.metrics
.prefetch_begin_time_millis
.set(now.subsec_millis() as u64);
});

// Max 1 active requests per thread.
mgr.prefetch_sema.add_permits(1);

Expand All @@ -240,7 +253,7 @@ impl AsyncWorkerMgr {
let mgr2 = mgr.clone();

match msg {
AsyncPrefetchMessage::BlobPrefetch(blob_cache, offset, size) => {
AsyncPrefetchMessage::BlobPrefetch(blob_cache, offset, size, begin_time) => {
let token = Semaphore::acquire_owned(mgr2.prefetch_sema.clone())
.await
.unwrap();
Expand All @@ -251,19 +264,25 @@ impl AsyncWorkerMgr {
blob_cache,
offset,
size,
begin_time,
);
drop(token);
});
}
}
AsyncPrefetchMessage::FsPrefetch(blob_cache, req) => {
AsyncPrefetchMessage::FsPrefetch(blob_cache, req, begin_time) => {
let token = Semaphore::acquire_owned(mgr2.prefetch_sema.clone())
.await
.unwrap();

if blob_cache.is_prefetch_active() {
rt.spawn_blocking(move || {
let _ = Self::handle_fs_prefetch_request(mgr2.clone(), blob_cache, req);
let _ = Self::handle_fs_prefetch_request(
mgr2.clone(),
blob_cache,
req,
begin_time,
);
drop(token)
});
}
Expand All @@ -282,14 +301,14 @@ impl AsyncWorkerMgr {
// Allocate network bandwidth budget
if let Some(limiter) = &self.prefetch_limiter {
let size = match msg {
AsyncPrefetchMessage::BlobPrefetch(blob_cache, _offset, size) => {
AsyncPrefetchMessage::BlobPrefetch(blob_cache, _offset, size, _) => {
if blob_cache.is_prefetch_active() {
*size
} else {
0
}
}
AsyncPrefetchMessage::FsPrefetch(blob_cache, req) => {
AsyncPrefetchMessage::FsPrefetch(blob_cache, req, _) => {
if blob_cache.is_prefetch_active() {
req.blob_size
} else {
Expand Down Expand Up @@ -321,6 +340,7 @@ impl AsyncWorkerMgr {
cache: Arc<dyn BlobCache>,
offset: u64,
size: u64,
begin_time: SystemTime,
) -> Result<()> {
trace!(
"storage: prefetch blob {} offset {} size {}",
Expand All @@ -332,6 +352,13 @@ impl AsyncWorkerMgr {
return Ok(());
}

// Record how much prefetch data is requested from storage backend.
// So the average backend merged request size will be prefetch_data_amount/prefetch_mr_count.
// We can measure merging possibility by this.
let metrics = mgr.metrics.clone();
metrics.prefetch_mr_count.inc();
metrics.prefetch_data_amount.add(size);

if let Some(obj) = cache.get_blob_object() {
if let Err(e) = obj.fetch_range_compressed(offset, size) {
warn!(
Expand All @@ -356,6 +383,8 @@ impl AsyncWorkerMgr {
warn!("prefetch blob range is not supported");
}

metrics.calculate_prefetch_metrics(begin_time);

Ok(())
}

Expand All @@ -368,6 +397,7 @@ impl AsyncWorkerMgr {
mgr: Arc<AsyncWorkerMgr>,
cache: Arc<dyn BlobCache>,
req: BlobIoRange,
begin_time: SystemTime,
) -> Result<()> {
let blob_offset = req.blob_offset;
let blob_size = req.blob_size;
Expand All @@ -393,6 +423,8 @@ impl AsyncWorkerMgr {
cache.prefetch_range(&req)?;
}

mgr.metrics.calculate_prefetch_metrics(begin_time);

Ok(())
}

Expand Down
17 changes: 17 additions & 0 deletions utils/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,11 @@ pub struct BlobcacheMetrics {
pub prefetch_mr_count: BasicMetric,
pub prefetch_workers: AtomicUsize,
pub prefetch_unmerged_chunks: BasicMetric,
pub prefetch_cumulative_time_millis: BasicMetric,
pub prefetch_begin_time_secs: BasicMetric,
pub prefetch_begin_time_millis: BasicMetric,
pub prefetch_end_time_secs: BasicMetric,
pub prefetch_end_time_millis: BasicMetric,
pub buffered_backend_size: BasicMetric,
pub data_all_ready: AtomicBool,
}
Expand Down Expand Up @@ -780,6 +785,18 @@ impl BlobcacheMetrics {
pub fn export_metrics(&self) -> IoStatsResult<String> {
serde_json::to_string(self).map_err(IoStatsError::Serialize)
}

pub fn calculate_prefetch_metrics(&self, begin_time: SystemTime) {
let now = SystemTime::now();
if let Ok(ref t) = now.duration_since(SystemTime::UNIX_EPOCH) {
self.prefetch_end_time_secs.set(t.as_secs());
self.prefetch_end_time_millis.set(t.subsec_millis() as u64);
}
if let Ok(ref t) = now.duration_since(begin_time) {
let elapsed = saturating_duration_millis(t);
self.prefetch_cumulative_time_millis.add(elapsed);
}
}
}

#[cfg(test)]
Expand Down

0 comments on commit 4c48504

Please sign in to comment.