Skip to content

Commit

Permalink
pageserver: add metric pageserver_secondary_resident_physical_size (#…
Browse files Browse the repository at this point in the history
…8204)

## Problem

We lack visibility of how much local disk space is used by secondary
tenant locations

Close: #8181

## Summary of changes

- Add `pageserver_secondary_resident_physical_size`, tagged by tenant
- Register & de-register label sets from SecondaryTenant
- Add+use wrappers in SecondaryDetail that update metrics when
adding+removing layers/timelines
  • Loading branch information
jcsp authored and VladLazar committed Jul 8, 2024
1 parent feeb2dc commit 86c8ba2
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 50 deletions.
11 changes: 10 additions & 1 deletion pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ static STANDBY_HORIZON: Lazy<IntGaugeVec> = Lazy::new(|| {
static RESIDENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_resident_physical_size",
"The size of the layer files present in the pageserver's filesystem.",
"The size of the layer files present in the pageserver's filesystem, for attached locations.",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
Expand Down Expand Up @@ -1691,6 +1691,15 @@ pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| {
}
});

pub(crate) static SECONDARY_RESIDENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_secondary_resident_physical_size",
"The size of the layer files present in the pageserver's filesystem, for secondary locations.",
&["tenant_id", "shard_id"]
)
.expect("failed to define a metric")
});

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RemoteOpKind {
Upload,
Expand Down
37 changes: 27 additions & 10 deletions pageserver/src/tenant/secondary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use super::{
storage_layer::LayerName,
};

use crate::metrics::SECONDARY_RESIDENT_PHYSICAL_SIZE;
use metrics::UIntGauge;
use pageserver_api::{
models,
shard::{ShardIdentity, TenantShardId},
Expand Down Expand Up @@ -99,6 +101,17 @@ pub(crate) struct SecondaryTenant {

// Public state indicating overall progress of downloads relative to the last heatmap seen
pub(crate) progress: std::sync::Mutex<models::SecondaryProgress>,

// Sum of layer sizes on local disk
pub(super) resident_size_metric: UIntGauge,
}

impl Drop for SecondaryTenant {
fn drop(&mut self) {
let tenant_id = self.tenant_shard_id.tenant_id.to_string();
let shard_id = format!("{}", self.tenant_shard_id.shard_slug());
let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
}
}

impl SecondaryTenant {
Expand All @@ -108,6 +121,12 @@ impl SecondaryTenant {
tenant_conf: TenantConfOpt,
config: &SecondaryLocationConfig,
) -> Arc<Self> {
let tenant_id = tenant_shard_id.tenant_id.to_string();
let shard_id = format!("{}", tenant_shard_id.shard_slug());
let resident_size_metric = SECONDARY_RESIDENT_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id])
.unwrap();

Arc::new(Self {
tenant_shard_id,
// todo: shall we make this a descendent of the
Expand All @@ -123,6 +142,8 @@ impl SecondaryTenant {
detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())),

progress: std::sync::Mutex::default(),

resident_size_metric,
})
}

Expand Down Expand Up @@ -211,16 +232,12 @@ impl SecondaryTenant {
// have to 100% match what is on disk, because it's a best-effort warming
// of the cache.
let mut detail = this.detail.lock().unwrap();
if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
let removed = timeline_detail.on_disk_layers.remove(&name);

// We might race with removal of the same layer during downloads, if it was removed
// from the heatmap. If we see that the OnDiskState is gone, then no need to
// do a physical deletion or store in evicted_at.
if let Some(removed) = removed {
removed.remove_blocking();
timeline_detail.evicted_at.insert(name, now);
}
if let Some(removed) =
detail.evict_layer(name, &timeline_id, now, &this.resident_size_metric)
{
// We might race with removal of the same layer during downloads, so finding the layer we
// were trying to remove is optional. Only issue the disk I/O to remove it if we found it.
removed.remove_blocking();
}
})
.await
Expand Down
173 changes: 134 additions & 39 deletions pageserver/src/tenant/secondary/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::tenant::{
use camino::Utf8PathBuf;
use chrono::format::{DelayedFormat, StrftimeItems};
use futures::Future;
use metrics::UIntGauge;
use pageserver_api::models::SecondaryProgress;
use pageserver_api::shard::TenantShardId;
use remote_storage::{DownloadError, Etag, GenericRemoteStorage};
Expand Down Expand Up @@ -131,16 +132,66 @@ impl OnDiskState {
.or_else(fs_ext::ignore_not_found)
.fatal_err("Deleting secondary layer")
}

pub(crate) fn file_size(&self) -> u64 {
self.metadata.file_size
}
}

#[derive(Debug, Clone, Default)]
pub(super) struct SecondaryDetailTimeline {
pub(super) on_disk_layers: HashMap<LayerName, OnDiskState>,
on_disk_layers: HashMap<LayerName, OnDiskState>,

/// We remember when layers were evicted, to prevent re-downloading them.
pub(super) evicted_at: HashMap<LayerName, SystemTime>,
}

impl SecondaryDetailTimeline {
pub(super) fn remove_layer(
&mut self,
name: &LayerName,
resident_metric: &UIntGauge,
) -> Option<OnDiskState> {
let removed = self.on_disk_layers.remove(name);
if let Some(removed) = &removed {
resident_metric.sub(removed.file_size());
}
removed
}

/// `local_path`
fn touch_layer<F>(
&mut self,
conf: &'static PageServerConf,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
touched: &HeatMapLayer,
resident_metric: &UIntGauge,
local_path: F,
) where
F: FnOnce() -> Utf8PathBuf,
{
use std::collections::hash_map::Entry;
match self.on_disk_layers.entry(touched.name.clone()) {
Entry::Occupied(mut v) => {
v.get_mut().access_time = touched.access_time;
}
Entry::Vacant(e) => {
e.insert(OnDiskState::new(
conf,
tenant_shard_id,
timeline_id,
touched.name.clone(),
touched.metadata.clone(),
touched.access_time,
local_path(),
));
resident_metric.add(touched.metadata.file_size);
}
}
}
}

// Aspects of a heatmap that we remember after downloading it
#[derive(Clone, Debug)]
struct DownloadSummary {
Expand All @@ -158,7 +209,7 @@ pub(super) struct SecondaryDetail {

last_download: Option<DownloadSummary>,
next_download: Option<Instant>,
pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
}

/// Helper for logging SystemTime
Expand Down Expand Up @@ -191,6 +242,38 @@ impl SecondaryDetail {
}
}

pub(super) fn evict_layer(
&mut self,
name: LayerName,
timeline_id: &TimelineId,
now: SystemTime,
resident_metric: &UIntGauge,
) -> Option<OnDiskState> {
let timeline = self.timelines.get_mut(timeline_id)?;
let removed = timeline.remove_layer(&name, resident_metric);
if removed.is_some() {
timeline.evicted_at.insert(name, now);
}
removed
}

pub(super) fn remove_timeline(
&mut self,
timeline_id: &TimelineId,
resident_metric: &UIntGauge,
) {
let removed = self.timelines.remove(timeline_id);
if let Some(removed) = removed {
resident_metric.sub(
removed
.on_disk_layers
.values()
.map(|l| l.metadata.file_size)
.sum(),
);
}
}

/// Additionally returns the total number of layers, used for more stable relative access time
/// based eviction.
pub(super) fn get_layers_for_eviction(
Expand Down Expand Up @@ -601,8 +684,13 @@ impl<'a> TenantDownloader<'a> {
Some(t) => t,
None => {
// We have no existing state: need to scan local disk for layers first.
let timeline_state =
init_timeline_state(self.conf, tenant_shard_id, timeline).await;
let timeline_state = init_timeline_state(
self.conf,
tenant_shard_id,
timeline,
&self.secondary_state.resident_size_metric,
)
.await;

// Re-acquire detail lock now that we're done with async load from local FS
self.secondary_state
Expand Down Expand Up @@ -671,6 +759,25 @@ impl<'a> TenantDownloader<'a> {
.await?;
}

// Metrics consistency check in testing builds
if cfg!(feature = "testing") {
let detail = self.secondary_state.detail.lock().unwrap();
let resident_size = detail
.timelines
.values()
.map(|tl| {
tl.on_disk_layers
.values()
.map(|v| v.metadata.file_size)
.sum::<u64>()
})
.sum::<u64>();
assert_eq!(
resident_size,
self.secondary_state.resident_size_metric.get()
);
}

// Only update last_etag after a full successful download: this way will not skip
// the next download, even if the heatmap's actual etag is unchanged.
self.secondary_state.detail.lock().unwrap().last_download = Some(DownloadSummary {
Expand Down Expand Up @@ -783,7 +890,7 @@ impl<'a> TenantDownloader<'a> {
for delete_timeline in &delete_timelines {
// We haven't removed from disk yet, but optimistically remove from in-memory state: if removal
// from disk fails that will be a fatal error.
detail.timelines.remove(delete_timeline);
detail.remove_timeline(delete_timeline, &self.secondary_state.resident_size_metric);
}
}

Expand All @@ -801,7 +908,7 @@ impl<'a> TenantDownloader<'a> {
let Some(timeline_state) = detail.timelines.get_mut(&timeline_id) else {
continue;
};
timeline_state.on_disk_layers.remove(&layer_name);
timeline_state.remove_layer(&layer_name, &self.secondary_state.resident_size_metric);
}

for timeline_id in delete_timelines {
Expand Down Expand Up @@ -1000,33 +1107,24 @@ impl<'a> TenantDownloader<'a> {
let timeline_detail = detail.timelines.entry(timeline_id).or_default();

tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());

for t in touched {
use std::collections::hash_map::Entry;
match timeline_detail.on_disk_layers.entry(t.name.clone()) {
Entry::Occupied(mut v) => {
v.get_mut().access_time = t.access_time;
}
Entry::Vacant(e) => {
let local_path = local_layer_path(
touched.into_iter().for_each(|t| {
timeline_detail.touch_layer(
self.conf,
tenant_shard_id,
&timeline_id,
&t,
&self.secondary_state.resident_size_metric,
|| {
local_layer_path(
self.conf,
tenant_shard_id,
&timeline_id,
&t.name,
&t.metadata.generation,
);
e.insert(OnDiskState::new(
self.conf,
tenant_shard_id,
&timeline_id,
t.name,
t.metadata.clone(),
t.access_time,
local_path,
));
}
}
}
)
},
)
});
}

result
Expand Down Expand Up @@ -1135,6 +1233,7 @@ async fn init_timeline_state(
conf: &'static PageServerConf,
tenant_shard_id: &TenantShardId,
heatmap: &HeatMapTimeline,
resident_metric: &UIntGauge,
) -> SecondaryDetailTimeline {
let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
let mut detail = SecondaryDetailTimeline::default();
Expand Down Expand Up @@ -1210,17 +1309,13 @@ async fn init_timeline_state(
} else {
// We expect the access time to be initialized immediately afterwards, when
// the latest heatmap is applied to the state.
detail.on_disk_layers.insert(
name.clone(),
OnDiskState::new(
conf,
tenant_shard_id,
&heatmap.timeline_id,
name,
remote_meta.metadata.clone(),
remote_meta.access_time,
file_path,
),
detail.touch_layer(
conf,
tenant_shard_id,
&heatmap.timeline_id,
remote_meta,
resident_metric,
|| file_path,
);
}
}
Expand Down

0 comments on commit 86c8ba2

Please sign in to comment.