diff --git a/Cargo.lock b/Cargo.lock index 6be08d16b1d2..d526e48198e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -917,6 +917,7 @@ dependencies = [ "reqwest", "safekeeper_api", "serde", + "serde_json", "serde_with", "storage_broker", "tar", @@ -2421,6 +2422,7 @@ dependencies = [ "crc32c", "criterion", "crossbeam-utils", + "either", "enum-map", "enumset", "fail", @@ -2484,6 +2486,7 @@ dependencies = [ "enum-map", "postgres_ffi", "serde", + "serde_json", "serde_with", "utils", "workspace_hack", diff --git a/Cargo.toml b/Cargo.toml index 9033671f5553..eaa25b423aa8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ comfy-table = "6.1" const_format = "0.2" crc32c = "0.6" crossbeam-utils = "0.8.5" +either = "1.8" enum-map = "2.4.2" enumset = "1.0.12" fail = "0.5.0" diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index 0b2f561d391b..309887e1fa3e 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -15,6 +15,7 @@ postgres.workspace = true regex.workspace = true reqwest = { workspace = true, features = ["blocking", "json"] } serde.workspace = true +serde_json.workspace = true serde_with.workspace = true tar.workspace = true thiserror.workspace = true diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 9cebe028e4d2..c49bd39f09b9 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -419,6 +419,11 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'trace_read_requests' as bool")?, + eviction_policy: settings + .get("eviction_policy") + .map(|x| serde_json::from_str(x)) + .transpose() + .context("Failed to parse 'eviction_policy' json")?, }) .send()? .error_from_body()?; diff --git a/libs/pageserver_api/Cargo.toml b/libs/pageserver_api/Cargo.toml index dafb246632ce..7709da10721c 100644 --- a/libs/pageserver_api/Cargo.toml +++ b/libs/pageserver_api/Cargo.toml @@ -14,5 +14,6 @@ byteorder.workspace = true utils.workspace = true postgres_ffi.workspace = true enum-map.workspace = true +serde_json.workspace = true workspace_hack.workspace = true diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 9cdcf3a17391..3ac7e31ec294 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -155,6 +155,11 @@ pub struct TenantConfigRequest { pub lagging_wal_timeout: Option, pub max_lsn_wal_lag: Option, pub trace_read_requests: Option, + // We defer the parsing of the eviction_policy field to the request handler. + // Otherwise we'd have to move the types for eviction policy into this package. + // We might do that once the eviction feature has stabilizied. + // For now, this field is not even documented in the openapi_spec.yml. + pub eviction_policy: Option, } impl TenantConfigRequest { @@ -174,6 +179,7 @@ impl TenantConfigRequest { lagging_wal_timeout: None, max_lsn_wal_lag: None, trace_read_requests: None, + eviction_policy: None, } } } @@ -263,11 +269,11 @@ pub struct LayerResidenceEvent { /// #[serde(rename = "timestamp_millis_since_epoch")] #[serde_as(as = "serde_with::TimestampMilliSeconds")] - timestamp: SystemTime, + pub timestamp: SystemTime, /// The new residence status of the layer. - status: LayerResidenceStatus, + pub status: LayerResidenceStatus, /// The reason why we had to record this event. - reason: LayerResidenceEventReason, + pub reason: LayerResidenceEventReason, } /// The reason for recording a given [`ResidenceEvent`]. diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index f3ad2c5de621..d2f0b84863b3 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -23,6 +23,7 @@ const_format.workspace = true consumption_metrics.workspace = true crc32c.workspace = true crossbeam-utils.workspace = true +either.workspace = true fail.workspace = true futures.workspace = true git-version.workspace = true @@ -51,7 +52,7 @@ thiserror.workspace = true tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] } tokio-postgres.workspace = true tokio-util.workspace = true -toml_edit.workspace = true +toml_edit = { workspace = true, features = [ "serde" ] } tracing.workspace = true url.workspace = true walkdir.workspace = true diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index f88895a97064..309e5367a45c 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -731,6 +731,13 @@ impl PageServerConf { })?); } + if let Some(eviction_policy) = item.get("eviction_policy") { + t_conf.eviction_policy = Some( + toml_edit::de::from_item(eviction_policy.clone()) + .context("parse eviction_policy")?, + ); + } + Ok(t_conf) } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 229cf96ee37b..6a9232e097aa 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -797,6 +797,14 @@ async fn update_tenant_config_handler( ); } + if let Some(eviction_policy) = request_data.eviction_policy { + tenant_conf.eviction_policy = Some( + serde_json::from_value(eviction_policy) + .context("parse field `eviction_policy`") + .map_err(ApiError::BadRequest)?, + ); + } + let state = get_state(&request); mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id) .instrument(info_span!("tenant_config", tenant = ?tenant_id)) diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index c4f213e75572..2734031a09e0 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -231,6 +231,9 @@ pub enum TaskKind { // Compaction. One per tenant. Compaction, + // Eviction. One per timeline. + Eviction, + // Initial logical size calculation InitialLogicalSizeCalculation, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index bc943372f8fe..23210b98d5bd 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2757,6 +2757,7 @@ pub mod harness { lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout), max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag), trace_read_requests: Some(tenant_conf.trace_read_requests), + eviction_policy: Some(tenant_conf.eviction_policy), } } } diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 087cff2537de..fca08dd51aa6 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -91,6 +91,7 @@ pub struct TenantConf { /// to avoid eager reconnects. pub max_lsn_wal_lag: NonZeroU64, pub trace_read_requests: bool, + pub eviction_policy: EvictionPolicy, } /// Same as TenantConf, but this struct preserves the information about @@ -153,6 +154,34 @@ pub struct TenantConfOpt { #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] pub trace_read_requests: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub eviction_policy: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "kind")] +pub enum EvictionPolicy { + NoEviction, + LayerAccessThreshold(EvictionPolicyLayerAccessThreshold), +} + +impl EvictionPolicy { + pub fn discriminant_str(&self) -> &'static str { + match self { + EvictionPolicy::NoEviction => "NoEviction", + EvictionPolicy::LayerAccessThreshold(_) => "LayerAccessThreshold", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub struct EvictionPolicyLayerAccessThreshold { + #[serde(with = "humantime_serde")] + pub period: Duration, + #[serde(with = "humantime_serde")] + pub threshold: Duration, } impl TenantConfOpt { @@ -189,6 +218,7 @@ impl TenantConfOpt { trace_read_requests: self .trace_read_requests .unwrap_or(global_conf.trace_read_requests), + eviction_policy: self.eviction_policy.unwrap_or(global_conf.eviction_policy), } } @@ -261,6 +291,7 @@ impl Default for TenantConf { max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG) .expect("cannot parse default max walreceiver Lsn wal lag"), trace_read_requests: false, + eviction_policy: EvictionPolicy::NoEviction, } } } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 6cf38f87379d..9198cfd1dfce 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -13,6 +13,7 @@ use crate::task_mgr::TaskKind; use crate::walrecord::NeonWalRecord; use anyhow::Result; use bytes::Bytes; +use either::Either; use enum_map::EnumMap; use enumset::EnumSet; use pageserver_api::models::LayerAccessKind; @@ -92,7 +93,23 @@ pub enum ValueReconstructResult { } #[derive(Debug)] -pub struct LayerAccessStats(Mutex); +pub struct LayerAccessStats(Mutex); + +/// This struct holds two instances of [`LayerAccessStatsInner`]. +/// Accesses are recorded to both instances. +/// The `for_scraping_api`instance can be reset from the management API via [`LayerAccessStatsReset`]. +/// The `for_eviction_policy` is never reset. +#[derive(Debug, Default, Clone)] +struct LayerAccessStatsLocked { + for_scraping_api: LayerAccessStatsInner, + for_eviction_policy: LayerAccessStatsInner, +} + +impl LayerAccessStatsLocked { + fn iter_mut(&mut self) -> impl Iterator { + [&mut self.for_scraping_api, &mut self.for_eviction_policy].into_iter() + } +} #[derive(Debug, Default, Clone)] struct LayerAccessStatsInner { @@ -104,10 +121,10 @@ struct LayerAccessStatsInner { } #[derive(Debug, Clone, Copy)] -struct LayerAccessStatFullDetails { - when: SystemTime, - task_kind: TaskKind, - access_kind: LayerAccessKind, +pub(super) struct LayerAccessStatFullDetails { + pub(super) when: SystemTime, + pub(super) task_kind: TaskKind, + pub(super) access_kind: LayerAccessKind, } #[derive(Clone, Copy, strum_macros::EnumString)] @@ -142,13 +159,13 @@ impl LayerAccessStatFullDetails { impl LayerAccessStats { pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self { - let new = LayerAccessStats(Mutex::new(LayerAccessStatsInner::default())); + let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())); new.record_residence_event(status, LayerResidenceEventReason::LayerLoad); new } pub(crate) fn for_new_layer_file() -> Self { - let new = LayerAccessStats(Mutex::new(LayerAccessStatsInner::default())); + let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())); new.record_residence_event( LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, @@ -176,37 +193,43 @@ impl LayerAccessStats { status: LayerResidenceStatus, reason: LayerResidenceEventReason, ) { - let mut inner = self.0.lock().unwrap(); - inner - .last_residence_changes - .write(LayerResidenceEvent::new(status, reason)); + let mut locked = self.0.lock().unwrap(); + locked.iter_mut().for_each(|inner| { + inner + .last_residence_changes + .write(LayerResidenceEvent::new(status, reason)) + }); } fn record_access(&self, access_kind: LayerAccessKind, task_kind: TaskKind) { - let mut inner = self.0.lock().unwrap(); let this_access = LayerAccessStatFullDetails { when: SystemTime::now(), task_kind, access_kind, }; - inner.first_access.get_or_insert(this_access); - inner.count_by_access_kind[access_kind] += 1; - inner.task_kind_flag |= task_kind; - inner.last_accesses.write(this_access); + + let mut locked = self.0.lock().unwrap(); + locked.iter_mut().for_each(|inner| { + inner.first_access.get_or_insert(this_access); + inner.count_by_access_kind[access_kind] += 1; + inner.task_kind_flag |= task_kind; + inner.last_accesses.write(this_access); + }) } fn as_api_model( &self, reset: LayerAccessStatsReset, ) -> pageserver_api::models::LayerAccessStats { - let mut inner = self.0.lock().unwrap(); + let mut locked = self.0.lock().unwrap(); + let inner = &mut locked.for_scraping_api; let LayerAccessStatsInner { first_access, count_by_access_kind, task_kind_flag, last_accesses, last_residence_changes, - } = &*inner; + } = inner; let ret = pageserver_api::models::LayerAccessStats { access_count_by_access_kind: count_by_access_kind .iter() @@ -231,6 +254,20 @@ impl LayerAccessStats { } ret } + + pub(super) fn most_recent_access_or_residence_event( + &self, + ) -> Either { + let locked = self.0.lock().unwrap(); + let inner = &locked.for_eviction_policy; + match inner.last_accesses.recent() { + Some(a) => Either::Left(*a), + None => match inner.last_residence_changes.recent() { + Some(e) => Either::Right(e.clone()), + None => unreachable!("constructors for LayerAccessStats ensure that there's always a residence change event"), + } + } + } } /// Supertrait of the [`Layer`] trait that captures the bare minimum interface diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index e1156e727068..bcbf8a12b45a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1,5 +1,6 @@ //! +mod eviction_task; mod walreceiver; use anyhow::{anyhow, bail, ensure, Context}; @@ -47,7 +48,7 @@ use crate::metrics::TimelineMetrics; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key}; use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError}; -use crate::tenant::config::TenantConfOpt; +use crate::tenant::config::{EvictionPolicy, TenantConfOpt}; use pageserver_api::reltag::RelTag; use postgres_connection::PgConnectionConfig; @@ -801,6 +802,7 @@ impl Timeline { pub fn activate(self: &Arc) { self.set_state(TimelineState::Active); self.launch_wal_receiver(); + self.launch_eviction_task(); } pub fn set_state(&self, new_state: TimelineState) { @@ -889,7 +891,10 @@ impl Timeline { } } + /// Evict multiple layers at once, continuing through errors. + /// /// Try to evict the given `layers_to_evict` by + /// /// 1. Replacing the given layer object in the layer map with a corresponding [`RemoteLayer`] object. /// 2. Deleting the now unreferenced layer file from disk. /// @@ -1057,6 +1062,13 @@ impl Timeline { .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold) } + fn get_eviction_policy(&self) -> EvictionPolicy { + let tenant_conf = self.tenant_conf.read().unwrap(); + tenant_conf + .eviction_policy + .unwrap_or(self.conf.default_tenant_conf.eviction_policy) + } + /// Open a Timeline handle. /// /// Loads the metadata for the timeline into memory, but not the layer map. diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs new file mode 100644 index 000000000000..e3e7ce4c9df9 --- /dev/null +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -0,0 +1,199 @@ +//! The per-timeline layer eviction task. + +use std::{ + ops::ControlFlow, + sync::Arc, + time::{Duration, SystemTime}, +}; + +use either::Either; +use tokio::time::Instant; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info, instrument, warn}; + +use crate::{ + task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, + tenant::{ + config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold}, + storage_layer::PersistentLayer, + }, +}; + +use super::Timeline; + +impl Timeline { + pub(super) fn launch_eviction_task(self: &Arc) { + let self_clone = Arc::clone(self); + task_mgr::spawn( + BACKGROUND_RUNTIME.handle(), + TaskKind::Eviction, + Some(self.tenant_id), + Some(self.timeline_id), + &format!("layer eviction for {}/{}", self.tenant_id, self.timeline_id), + false, + async move { + self_clone.eviction_task(task_mgr::shutdown_token()).await; + info!("eviction task finishing"); + Ok(()) + }, + ); + } + + #[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))] + async fn eviction_task(self: Arc, cancel: CancellationToken) { + loop { + let policy = self.get_eviction_policy(); + let cf = self.eviction_iteration(&policy, cancel.clone()).await; + match cf { + ControlFlow::Break(()) => break, + ControlFlow::Continue(sleep_until) => { + tokio::select! { + _ = cancel.cancelled() => { + info!("shutting down"); + break; + } + _ = tokio::time::sleep_until(sleep_until) => { } + } + } + } + } + } + + #[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))] + async fn eviction_iteration( + self: &Arc, + policy: &EvictionPolicy, + cancel: CancellationToken, + ) -> ControlFlow<(), Instant> { + debug!("eviction iteration: {policy:?}"); + match policy { + EvictionPolicy::NoEviction => { + // check again in 10 seconds; XXX config watch mechanism + ControlFlow::Continue(Instant::now() + Duration::from_secs(10)) + } + EvictionPolicy::LayerAccessThreshold(p) => { + let start = Instant::now(); + match self.eviction_iteration_threshold(p, cancel).await { + ControlFlow::Break(()) => return ControlFlow::Break(()), + ControlFlow::Continue(()) => (), + } + let elapsed = start.elapsed(); + if elapsed > p.period { + warn!( + configured_period = %humantime::format_duration(p.period), + last_period = %humantime::format_duration(elapsed), + "this eviction period took longer than the configured period" + ); + } + ControlFlow::Continue(start + p.period) + } + } + } + + async fn eviction_iteration_threshold( + self: &Arc, + p: &EvictionPolicyLayerAccessThreshold, + cancel: CancellationToken, + ) -> ControlFlow<()> { + let now = SystemTime::now(); + + #[allow(dead_code)] + #[derive(Debug, Default)] + struct EvictionStats { + not_considered_due_to_clock_skew: usize, + candidates: usize, + evicted: usize, + errors: usize, + not_evictable: usize, + skipped_for_shutdown: usize, + } + let mut stats = EvictionStats::default(); + // Gather layers for eviction. + // NB: all the checks can be invalidated as soon as we release the layer map lock. + // We don't want to hold the layer map lock during eviction. + // So, we just need to deal with this. + let candidates: Vec> = { + let layers = self.layers.read().unwrap(); + let mut candidates = Vec::new(); + for hist_layer in layers.iter_historic_layers() { + if hist_layer.is_remote_layer() { + continue; + } + let last_activity_ts = match hist_layer + .access_stats() + .most_recent_access_or_residence_event() + { + Either::Left(mra) => mra.when, + Either::Right(re) => re.timestamp, + }; + let no_activity_for = match now.duration_since(last_activity_ts) { + Ok(d) => d, + Err(_e) => { + // NB: don't log the error. If there are many layers and the system clock + // is skewed, we'd be flooding the log. + stats.not_considered_due_to_clock_skew += 1; + continue; + } + }; + if no_activity_for > p.threshold { + candidates.push(hist_layer) + } + } + candidates + }; + stats.candidates = candidates.len(); + + let remote_client = match self.remote_client.as_ref() { + None => { + error!( + num_candidates = candidates.len(), + "no remote storage configured, cannot evict layers" + ); + return ControlFlow::Continue(()); + } + Some(c) => c, + }; + + let results = match self + .evict_layer_batch(remote_client, &candidates[..], cancel) + .await + { + Err(pre_err) => { + stats.errors += candidates.len(); + error!("could not do any evictions: {pre_err:#}"); + return ControlFlow::Continue(()); + } + Ok(results) => results, + }; + assert_eq!(results.len(), candidates.len()); + for (l, result) in candidates.iter().zip(results) { + match result { + None => { + stats.skipped_for_shutdown += 1; + } + Some(Ok(true)) => { + debug!("evicted layer {l:?}"); + stats.evicted += 1; + } + Some(Ok(false)) => { + debug!("layer is not evictable: {l:?}"); + stats.not_evictable += 1; + } + Some(Err(e)) => { + // This variant is the case where an unexpected error happened during eviction. + // Expected errors that result in non-eviction are `Some(Ok(false))`. + // So, dump Debug here to gather as much info as possible in this rare case. + warn!("failed to evict layer {l:?}: {e:?}"); + stats.errors += 1; + } + } + } + if stats.not_considered_due_to_clock_skew > 0 || stats.errors > 0 || stats.not_evictable > 0 + { + warn!(stats=?stats, "eviction iteration complete"); + } else { + info!(stats=?stats, "eviction iteration complete"); + } + ControlFlow::Continue(()) + } +}