diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 82701ed4b0b7..0615d50c1aad 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -58,6 +58,8 @@ pub mod pageserver_feedback; pub mod tracing_span_assert; +pub mod rate_limit; + /// use with fail::cfg("$name", "return(2000)") #[macro_export] macro_rules! failpoint_sleep_millis_async { diff --git a/libs/utils/src/rate_limit.rs b/libs/utils/src/rate_limit.rs new file mode 100644 index 000000000000..557955bb880e --- /dev/null +++ b/libs/utils/src/rate_limit.rs @@ -0,0 +1,66 @@ +//! A helper to rate limit operations. + +use std::time::{Duration, Instant}; + +pub struct RateLimit { + last: Option, + interval: Duration, +} + +impl RateLimit { + pub fn new(interval: Duration) -> Self { + Self { + last: None, + interval, + } + } + + /// Call `f` if the rate limit allows. + /// Don't call it otherwise. + pub fn call(&mut self, f: F) { + let now = Instant::now(); + match self.last { + Some(last) if now - last <= self.interval => { + // ratelimit + } + _ => { + self.last = Some(now); + f(); + } + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::AtomicUsize; + + #[test] + fn basics() { + use super::RateLimit; + use std::sync::atomic::Ordering::Relaxed; + use std::time::Duration; + + let called = AtomicUsize::new(0); + let mut f = RateLimit::new(Duration::from_millis(100)); + + let cl = || { + called.fetch_add(1, Relaxed); + }; + + f.call(cl); + assert_eq!(called.load(Relaxed), 1); + f.call(cl); + assert_eq!(called.load(Relaxed), 1); + f.call(cl); + assert_eq!(called.load(Relaxed), 1); + std::thread::sleep(Duration::from_millis(100)); + f.call(cl); + assert_eq!(called.load(Relaxed), 2); + f.call(cl); + assert_eq!(called.load(Relaxed), 2); + std::thread::sleep(Duration::from_millis(100)); + f.call(cl); + assert_eq!(called.load(Relaxed), 3); + } +} diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 2ee723e7c363..d30d6c5c6efb 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -13,9 +13,9 @@ use crate::task_mgr::TaskKind; use crate::walrecord::NeonWalRecord; use anyhow::Result; use bytes::Bytes; -use either::Either; use enum_map::EnumMap; use enumset::EnumSet; +use once_cell::sync::Lazy; use pageserver_api::models::LayerAccessKind; use pageserver_api::models::{ HistoricLayerInfo, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus, @@ -23,8 +23,10 @@ use pageserver_api::models::{ use std::ops::Range; use std::path::PathBuf; use std::sync::{Arc, Mutex}; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tracing::warn; use utils::history_buffer::HistoryBufferWithDropCounter; +use utils::rate_limit::RateLimit; use utils::{ id::{TenantId, TimelineId}, @@ -37,6 +39,8 @@ pub use image_layer::{ImageLayer, ImageLayerWriter}; pub use inmemory_layer::InMemoryLayer; pub use remote_layer::RemoteLayer; +use super::layer_map::BatchedUpdates; + pub fn range_overlaps(a: &Range, b: &Range) -> bool where T: PartialOrd, @@ -158,41 +162,82 @@ impl LayerAccessStatFullDetails { } impl LayerAccessStats { - pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self { - let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())); - new.record_residence_event(status, LayerResidenceEventReason::LayerLoad); - new + /// Create an empty stats object. + /// + /// The caller is responsible for recording a residence event + /// using [`record_residence_event`] before calling `latest_activity`. + /// If they don't, [`latest_activity`] will return `None`. + pub(crate) fn empty_will_record_residence_event_later() -> Self { + LayerAccessStats(Mutex::default()) } - pub(crate) fn for_new_layer_file() -> Self { + /// Create an empty stats object and record a [`LayerLoad`] event with the given residence status. + /// + /// See [`record_residence_event`] for why you need to do this while holding the layer map lock. + pub(crate) fn for_loading_layer( + layer_map_lock_held_witness: &BatchedUpdates<'_, L>, + status: LayerResidenceStatus, + ) -> Self + where + L: ?Sized + Layer, + { let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())); new.record_residence_event( - LayerResidenceStatus::Resident, - LayerResidenceEventReason::LayerCreate, + layer_map_lock_held_witness, + status, + LayerResidenceEventReason::LayerLoad, ); new } /// Creates a clone of `self` and records `new_status` in the clone. - /// The `new_status` is not recorded in `self` - pub(crate) fn clone_for_residence_change( + /// + /// The `new_status` is not recorded in `self`. + /// + /// See [`record_residence_event`] for why you need to do this while holding the layer map lock. + pub(crate) fn clone_for_residence_change( &self, + layer_map_lock_held_witness: &BatchedUpdates<'_, L>, new_status: LayerResidenceStatus, - ) -> LayerAccessStats { + ) -> LayerAccessStats + where + L: ?Sized + Layer, + { let clone = { let inner = self.0.lock().unwrap(); inner.clone() }; let new = LayerAccessStats(Mutex::new(clone)); - new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange); + new.record_residence_event( + layer_map_lock_held_witness, + new_status, + LayerResidenceEventReason::ResidenceChange, + ); new } - fn record_residence_event( + /// Record a change in layer residency. + /// + /// Recording the event must happen while holding the layer map lock to + /// ensure that latest-activity-threshold-based layer eviction (eviction_task.rs) + /// can do an "imitate access" to this layer, before it observes `now-latest_activity() > threshold`. + /// + /// If we instead recorded the residence event with a timestamp from before grabbing the layer map lock, + /// the following race could happen: + /// + /// - Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp. + /// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map. + /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock. + /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event. + /// + pub(crate) fn record_residence_event( &self, + _layer_map_lock_held_witness: &BatchedUpdates<'_, L>, status: LayerResidenceStatus, reason: LayerResidenceEventReason, - ) { + ) where + L: ?Sized + Layer, + { let mut locked = self.0.lock().unwrap(); locked.iter_mut().for_each(|inner| { inner @@ -255,24 +300,37 @@ impl LayerAccessStats { ret } - fn most_recent_access_or_residence_event( - &self, - ) -> Either { + /// Get the latest access timestamp, falling back to latest residence event. + /// + /// This function can only return `None` if there has not yet been a call to the + /// [`record_residence_event`] method. That would generally be considered an + /// implementation error. This function logs a rate-limited warning in that case. + /// + /// TODO: use type system to avoid the need for `fallback`. + /// The approach in https://github.com/neondatabase/neon/pull/3775 + /// could be used to enforce that a residence event is recorded + /// before a layer is added to the layer map. We could also have + /// a layer wrapper type that holds the LayerAccessStats, and ensure + /// that that type can only be produced by inserting into the layer map. + pub(crate) fn latest_activity(&self) -> Option { let locked = self.0.lock().unwrap(); let inner = &locked.for_eviction_policy; match inner.last_accesses.recent() { - Some(a) => Either::Left(*a), + Some(a) => Some(a.when), None => match inner.last_residence_changes.recent() { - Some(e) => Either::Right(e.clone()), - None => unreachable!("constructors for LayerAccessStats ensure that there's always a residence change event"), - } - } - } - - pub(crate) fn latest_activity(&self) -> SystemTime { - match self.most_recent_access_or_residence_event() { - Either::Left(mra) => mra.when, - Either::Right(re) => re.timestamp, + Some(e) => Some(e.timestamp), + None => { + static WARN_RATE_LIMIT: Lazy> = + Lazy::new(|| Mutex::new((0, RateLimit::new(Duration::from_secs(10))))); + let mut guard = WARN_RATE_LIMIT.lock().unwrap(); + guard.0 += 1; + let occurences = guard.0; + guard.1.call(move || { + warn!(parent: None, occurences, "latest_activity not available, this is an implementation bug, using fallback value"); + }); + None + } + }, } } } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 98cbcc5f07d5..ba3ab6dd4c27 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -57,7 +57,7 @@ use utils::{ use super::{ DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter, - LayerKeyIter, LayerResidenceStatus, PathOrConf, + LayerKeyIter, PathOrConf, }; /// @@ -637,7 +637,7 @@ impl DeltaLayer { key_range: summary.key_range, lsn_range: summary.lsn_range, file_size: metadata.len(), - access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident), + access_stats: LayerAccessStats::empty_will_record_residence_event_later(), inner: RwLock::new(DeltaLayerInner { loaded: false, file: None, @@ -808,7 +808,7 @@ impl DeltaLayerWriterInner { key_range: self.key_start..key_end, lsn_range: self.lsn_range.clone(), file_size: metadata.len(), - access_stats: LayerAccessStats::for_new_layer_file(), + access_stats: LayerAccessStats::empty_will_record_residence_event_later(), inner: RwLock::new(DeltaLayerInner { loaded: false, file: None, diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index a99b1b491f3d..d298b3e852ef 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -53,7 +53,7 @@ use utils::{ }; use super::filename::{ImageFileName, LayerFileName}; -use super::{Layer, LayerAccessStatsReset, LayerIter, LayerResidenceStatus, PathOrConf}; +use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf}; /// /// Header stored in the beginning of the file @@ -438,7 +438,7 @@ impl ImageLayer { key_range: summary.key_range, lsn: summary.lsn, file_size: metadata.len(), - access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident), + access_stats: LayerAccessStats::empty_will_record_residence_event_later(), inner: RwLock::new(ImageLayerInner { file: None, loaded: false, @@ -598,7 +598,7 @@ impl ImageLayerWriterInner { key_range: self.key_range.clone(), lsn: self.lsn, file_size: metadata.len(), - access_stats: LayerAccessStats::for_new_layer_file(), + access_stats: LayerAccessStats::empty_will_record_residence_event_later(), inner: RwLock::new(ImageLayerInner { loaded: false, file: None, diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index 2eb7eb0cb68a..2106587ab20b 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -4,6 +4,7 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::repository::Key; +use crate::tenant::layer_map::BatchedUpdates; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; use anyhow::{bail, Result}; @@ -246,11 +247,15 @@ impl RemoteLayer { } /// Create a Layer struct representing this layer, after it has been downloaded. - pub fn create_downloaded_layer( + pub fn create_downloaded_layer( &self, + layer_map_lock_held_witness: &BatchedUpdates<'_, L>, conf: &'static PageServerConf, file_size: u64, - ) -> Arc { + ) -> Arc + where + L: ?Sized + Layer, + { if self.is_delta { let fname = DeltaFileName { key_range: self.key_range.clone(), @@ -262,8 +267,10 @@ impl RemoteLayer { self.tenantid, &fname, file_size, - self.access_stats - .clone_for_residence_change(LayerResidenceStatus::Resident), + self.access_stats.clone_for_residence_change( + layer_map_lock_held_witness, + LayerResidenceStatus::Resident, + ), )) } else { let fname = ImageFileName { @@ -276,8 +283,10 @@ impl RemoteLayer { self.tenantid, &fname, file_size, - self.access_stats - .clone_for_residence_change(LayerResidenceStatus::Resident), + self.access_stats.clone_for_residence_change( + layer_map_lock_held_witness, + LayerResidenceStatus::Resident, + ), )) } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index bc55c2091c81..90f2951aefc6 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -11,7 +11,8 @@ use itertools::Itertools; use once_cell::sync::OnceCell; use pageserver_api::models::{ DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, - DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceStatus, TimelineState, + DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus, + TimelineState, }; use remote_storage::GenericRemoteStorage; use storage_broker::BrokerClientChannel; @@ -1111,7 +1112,7 @@ impl Timeline { &layer_metadata, local_layer .access_stats() - .clone_for_residence_change(LayerResidenceStatus::Evicted), + .clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted), ), LayerFileName::Delta(delta_name) => RemoteLayer::new_delta( self.tenant_id, @@ -1120,7 +1121,7 @@ impl Timeline { &layer_metadata, local_layer .access_stats() - .clone_for_residence_change(LayerResidenceStatus::Evicted), + .clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted), ), }); @@ -1489,7 +1490,7 @@ impl Timeline { self.tenant_id, &imgfilename, file_size, - LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident), + LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident), ); trace!("found layer {}", layer.path().display()); @@ -1521,7 +1522,7 @@ impl Timeline { self.tenant_id, &deltafilename, file_size, - LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident), + LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident), ); trace!("found layer {}", layer.path().display()); @@ -1657,7 +1658,10 @@ impl Timeline { self.timeline_id, imgfilename, &remote_layer_metadata, - LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted), + LayerAccessStats::for_loading_layer( + &updates, + LayerResidenceStatus::Evicted, + ), ); let remote_layer = Arc::new(remote_layer); @@ -1682,7 +1686,10 @@ impl Timeline { self.timeline_id, deltafilename, &remote_layer_metadata, - LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted), + LayerAccessStats::for_loading_layer( + &updates, + LayerResidenceStatus::Evicted, + ), ); let remote_layer = Arc::new(remote_layer); updates.insert_historic(remote_layer); @@ -2729,11 +2736,16 @@ impl Timeline { ])?; // Add it to the layer map - self.layers - .write() - .unwrap() - .batch_update() - .insert_historic(Arc::new(new_delta)); + let l = Arc::new(new_delta); + let mut layers = self.layers.write().unwrap(); + let mut batch_updates = layers.batch_update(); + l.access_stats().record_residence_event( + &batch_updates, + LayerResidenceStatus::Resident, + LayerResidenceEventReason::LayerCreate, + ); + batch_updates.insert_historic(l); + batch_updates.flush(); // update the timeline's physical size let sz = new_delta_path.metadata()?.len(); @@ -2938,7 +2950,13 @@ impl Timeline { self.metrics .resident_physical_size_gauge .add(metadata.len()); - updates.insert_historic(Arc::new(l)); + let l = Arc::new(l); + l.access_stats().record_residence_event( + &updates, + LayerResidenceStatus::Resident, + LayerResidenceEventReason::LayerCreate, + ); + updates.insert_historic(l); } updates.flush(); drop(layers); @@ -3371,6 +3389,11 @@ impl Timeline { new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); let x: Arc = Arc::new(l); + x.access_stats().record_residence_event( + &updates, + LayerResidenceStatus::Resident, + LayerResidenceEventReason::LayerCreate, + ); updates.insert_historic(x); } @@ -3881,9 +3904,9 @@ impl Timeline { // Download complete. Replace the RemoteLayer with the corresponding // Delta- or ImageLayer in the layer map. - let new_layer = remote_layer.create_downloaded_layer(self_clone.conf, *size); let mut layers = self_clone.layers.write().unwrap(); let mut updates = layers.batch_update(); + let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size); { use crate::tenant::layer_map::Replacement; let l: Arc = remote_layer.clone(); @@ -4155,7 +4178,15 @@ impl Timeline { continue; } - let last_activity_ts = l.access_stats().latest_activity(); + let last_activity_ts = l + .access_stats() + .latest_activity() + .unwrap_or_else(|| { + // We only use this fallback if there's an implementation error. + // `latest_activity` already does rate-limited warn!() log. + debug!(layer=%l.filename().file_name(), "last_activity returns None, using SystemTime::now"); + SystemTime::now() + }); resident_layers.push(LocalLayerInfoForDiskUsageEviction { layer: l, diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index cf799a880859..58321762ea15 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -184,7 +184,14 @@ impl Timeline { if hist_layer.is_remote_layer() { continue; } - let last_activity_ts = hist_layer.access_stats().latest_activity(); + + let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| { + // We only use this fallback if there's an implementation error. + // `latest_activity` already does rate-limited warn!() log. + debug!(layer=%hist_layer.filename().file_name(), "last_activity returns None, using SystemTime::now"); + SystemTime::now() + }); + let no_activity_for = match now.duration_since(last_activity_ts) { Ok(d) => d, Err(_e) => { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 79b2e5b29040..93fdc609001e 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -292,6 +292,12 @@ def port_distributor(worker_base_port: int) -> PortDistributor: return PortDistributor(base_port=worker_base_port, port_number=WORKER_PORT_NUM) +@pytest.fixture(scope="session") +def httpserver_listen_address(port_distributor: PortDistributor): + port = port_distributor.get_port() + return ("localhost", port) + + @pytest.fixture(scope="function") def default_broker( port_distributor: PortDistributor, diff --git a/test_runner/regress/test_metric_collection.py b/test_runner/regress/test_metric_collection.py index df542fb84a24..123118889673 100644 --- a/test_runner/regress/test_metric_collection.py +++ b/test_runner/regress/test_metric_collection.py @@ -24,13 +24,6 @@ from werkzeug.wrappers.request import Request from werkzeug.wrappers.response import Response - -@pytest.fixture(scope="session") -def httpserver_listen_address(port_distributor: PortDistributor): - port = port_distributor.get_port() - return ("localhost", port) - - # ============================================================================== # Storage metrics tests # ============================================================================== diff --git a/test_runner/regress/test_threshold_based_eviction.py b/test_runner/regress/test_threshold_based_eviction.py new file mode 100644 index 000000000000..c7083d92be89 --- /dev/null +++ b/test_runner/regress/test_threshold_based_eviction.py @@ -0,0 +1,179 @@ +import time +from dataclasses import dataclass +from typing import List, Set, Tuple + +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + NeonEnvBuilder, + PgBin, + RemoteStorageKind, + last_flush_lsn_upload, +) +from fixtures.pageserver.http import LayerMapInfo +from fixtures.types import TimelineId +from pytest_httpserver import HTTPServer + +# NB: basic config change tests are in test_tenant_conf.py + + +def test_threshold_based_eviction( + request, + httpserver: HTTPServer, + httpserver_listen_address, + pg_bin: PgBin, + neon_env_builder: NeonEnvBuilder, +): + neon_env_builder.enable_remote_storage(RemoteStorageKind.LOCAL_FS, f"{request.node.name}") + + # Start with metrics collection enabled, so that the eviction task + # imitates its accesses. We'll use a non-existent endpoint to make it fail. + # The synthetic size calculation will run regardless. + host, port = httpserver_listen_address + neon_env_builder.pageserver_config_override = f""" + metric_collection_interval="1s" + synthetic_size_calculation_interval="2s" + metric_collection_endpoint="http://{host}:{port}/nonexistent" + """ + metrics_refused_log_line = ".*metrics endpoint refused the sent metrics.*/nonexistent.*" + env = neon_env_builder.init_start() + env.pageserver.allowed_errors.append(metrics_refused_log_line) + + tenant_id, timeline_id = env.initial_tenant, env.initial_timeline + assert isinstance(timeline_id, TimelineId) + + ps_http = env.pageserver.http_client() + assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == { + "kind": "NoEviction" + } + + eviction_threshold = 5 + eviction_period = 1 + ps_http.set_tenant_config( + tenant_id, + { + "eviction_policy": { + "kind": "LayerAccessThreshold", + "threshold": f"{eviction_threshold}s", + "period": f"{eviction_period}s", + }, + }, + ) + assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == { + "kind": "LayerAccessThreshold", + "threshold": f"{eviction_threshold}s", + "period": f"{eviction_period}s", + } + + # restart because changing tenant config is not instant + env.pageserver.stop() + env.pageserver.start() + + assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == { + "kind": "LayerAccessThreshold", + "threshold": f"{eviction_threshold}s", + "period": f"{eviction_period}s", + } + + # create a bunch of L1s, only the least of which will need to be resident + compaction_threshold = 3 # create L1 layers quickly + ps_http.patch_tenant_config_client_side( + tenant_id, + inserts={ + # Disable gc and compaction to avoid on-demand downloads from their side. + # The only on-demand downloads should be from the eviction tasks's "imitate access" functions. + "gc_period": "0s", + "compaction_period": "0s", + # low checkpoint_distance so that pgbench creates many layers + "checkpoint_distance": 1024**2, + # Low compaction target size to create many L1's with tight key ranges. + # This is so that the "imitate access" don't download all the layers. + "compaction_target_size": 1 * 1024**2, # all keys into one L1 + # Turn L0's into L1's fast. + "compaction_threshold": compaction_threshold, + # Prevent compaction from collapsing L1 delta layers into image layers. We want many layers here. + "image_creation_threshold": 100, + # Much larger so that synthetic size caluclation worker, which is part of metric collection, + # computes logical size for initdb_lsn every time, instead of some moving lsn as we insert data. + # This makes the set of downloaded layers predictable, + # thereby allowing the residence statuses to stabilize below. + "gc_horizon": 1024**4, + }, + ) + + # create a bunch of layers + with env.endpoints.create_start("main", tenant_id=tenant_id) as pg: + pg_bin.run(["pgbench", "-i", "-s", "3", pg.connstr()]) + last_flush_lsn_upload(env, pg, tenant_id, timeline_id) + # wrap up and shutdown safekeepers so that no more layers will be created after the final checkpoint + for sk in env.safekeepers: + sk.stop() + ps_http.timeline_checkpoint(tenant_id, timeline_id) + + # wait for evictions and assert that they stabilize + @dataclass + class ByLocalAndRemote: + remote_layers: Set[str] + local_layers: Set[str] + + class MapInfoProjection: + def __init__(self, info: LayerMapInfo): + self.info = info + + def by_local_and_remote(self) -> ByLocalAndRemote: + return ByLocalAndRemote( + remote_layers={ + layer.layer_file_name for layer in self.info.historic_layers if layer.remote + }, + local_layers={ + layer.layer_file_name for layer in self.info.historic_layers if not layer.remote + }, + ) + + def __eq__(self, other): + if not isinstance(other, MapInfoProjection): + return False + return self.by_local_and_remote() == other.by_local_and_remote() + + def __repr__(self) -> str: + out = ["MapInfoProjection:"] + for layer in sorted(self.info.historic_layers, key=lambda layer: layer.layer_file_name): + remote = "R" if layer.remote else "L" + out += [f" {remote} {layer.layer_file_name}"] + return "\n".join(out) + + observation_window = 8 * eviction_threshold + consider_stable_when_no_change_for_seconds = 3 * eviction_threshold + poll_interval = eviction_threshold / 3 + started_waiting_at = time.time() + map_info_changes: List[Tuple[float, MapInfoProjection]] = [] + while time.time() - started_waiting_at < observation_window: + current = ( + time.time(), + MapInfoProjection(ps_http.layer_map_info(tenant_id, timeline_id)), + ) + last = map_info_changes[-1] if map_info_changes else (0, None) + if last[1] is None or current[1] != last[1]: + map_info_changes.append(current) + log.info("change in layer map\n before: %s\n after: %s", last, current) + else: + stable_for = current[0] - last[0] + log.info("residencies stable for %s", stable_for) + if stable_for > consider_stable_when_no_change_for_seconds: + break + time.sleep(poll_interval) + + log.info("len(map_info_changes)=%s", len(map_info_changes)) + + # TODO: can we be more precise here? E.g., require we're stable _within_ X*threshold, + # instead of what we do here, i.e., stable _for at least_ X*threshold toward the end of the observation window + assert ( + stable_for > consider_stable_when_no_change_for_seconds + ), "layer residencies did not become stable within the observation window" + + post = map_info_changes[-1][1].by_local_and_remote() + assert len(post.remote_layers) > 0, "some layers should be evicted once it's stabilized" + assert len(post.local_layers) > 0, "the imitate accesses should keep some layers resident" + + assert env.pageserver.log_contains( + metrics_refused_log_line + ), "ensure the metrics collection worker ran"