Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eviction: regression test + distinguish layer write from map insert #4005

Merged
merged 17 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
d53c33c
eviction: regression test + distinguish layer write from map insert
problame Apr 11, 2023
ed29e17
refactor: simplify MapInfoProjection type & more preceise asserts at …
problame Apr 11, 2023
409b0f3
feat: technically the same applies for Timeline::download_remote_layer
problame Apr 13, 2023
30a77e0
Revert "feat: technically the same applies for Timeline::download_rem…
problame Apr 13, 2023
767c57f
feat: alternative approach that takes a witness for layer map lock held
problame Apr 13, 2023
a55987a
fixup rebase: eviction: env.postgres was renamed to env.endpoints
problame Apr 14, 2023
c2f5a4d
fix doc string in "feat: feat: alternative approach that takes a witn…
problame Apr 14, 2023
f480844
feat: rate-limit logging + centralize in the latest_activity method f…
problame Apr 14, 2023
63e5993
fix: use Mutex::default()
problame Apr 14, 2023
8de967c
feat: forgot to commit rate_limit_closure module
problame Apr 14, 2023
40ef242
apply Joonas's suggestion
problame May 4, 2023
da19e64
rename rate_limit_closure => rate_limit / RateLimitClosure=>RateLimit
problame May 4, 2023
0233cca
better way than tokio::spawn for logging at global scope
problame May 4, 2023
fedaa65
latest_activity(): return Option and use unwrap_or{,_else} in the cal…
problame May 4, 2023
b711449
use SystemTime::now() instead of SystemTime::UNIX_EPOCH as fallback
problame May 4, 2023
3f5de7c
Merge remote-tracking branch 'origin/main' into problame/threshold-ev…
problame May 4, 2023
9b41b5f
fix merge fallout
problame May 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions libs/utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ pub mod pageserver_feedback;

pub mod tracing_span_assert;

pub mod rate_limit;

/// use with fail::cfg("$name", "return(2000)")
#[macro_export]
macro_rules! failpoint_sleep_millis_async {
Expand Down
66 changes: 66 additions & 0 deletions libs/utils/src/rate_limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//! A helper to rate limit operations.

use std::time::{Duration, Instant};

pub struct RateLimit {
last: Option<Instant>,
interval: Duration,
}

impl RateLimit {
pub fn new(interval: Duration) -> Self {
Self {
last: None,
interval,
}
}

/// Call `f` if the rate limit allows.
/// Don't call it otherwise.
pub fn call<F: FnOnce()>(&mut self, f: F) {
let now = Instant::now();
match self.last {
Some(last) if now - last <= self.interval => {
// ratelimit
}
_ => {
self.last = Some(now);
f();
}
}
}
}

#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicUsize;

#[test]
fn basics() {
use super::RateLimit;
use std::sync::atomic::Ordering::Relaxed;
use std::time::Duration;

let called = AtomicUsize::new(0);
let mut f = RateLimit::new(Duration::from_millis(100));

let cl = || {
called.fetch_add(1, Relaxed);
};

f.call(cl);
assert_eq!(called.load(Relaxed), 1);
f.call(cl);
assert_eq!(called.load(Relaxed), 1);
f.call(cl);
assert_eq!(called.load(Relaxed), 1);
std::thread::sleep(Duration::from_millis(100));
f.call(cl);
assert_eq!(called.load(Relaxed), 2);
f.call(cl);
assert_eq!(called.load(Relaxed), 2);
std::thread::sleep(Duration::from_millis(100));
f.call(cl);
assert_eq!(called.load(Relaxed), 3);
}
}
116 changes: 87 additions & 29 deletions pageserver/src/tenant/storage_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,20 @@ use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use anyhow::Result;
use bytes::Bytes;
use either::Either;
use enum_map::EnumMap;
use enumset::EnumSet;
use once_cell::sync::Lazy;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::models::{
HistoricLayerInfo, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
};
use std::ops::Range;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::warn;
use utils::history_buffer::HistoryBufferWithDropCounter;
use utils::rate_limit::RateLimit;

use utils::{
id::{TenantId, TimelineId},
Expand All @@ -37,6 +39,8 @@ pub use image_layer::{ImageLayer, ImageLayerWriter};
pub use inmemory_layer::InMemoryLayer;
pub use remote_layer::RemoteLayer;

use super::layer_map::BatchedUpdates;

pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
where
T: PartialOrd<T>,
Expand Down Expand Up @@ -158,41 +162,82 @@ impl LayerAccessStatFullDetails {
}

impl LayerAccessStats {
pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
new
/// Create an empty stats object.
///
/// The caller is responsible for recording a residence event
/// using [`record_residence_event`] before calling `latest_activity`.
/// If they don't, [`latest_activity`] will return `None`.
pub(crate) fn empty_will_record_residence_event_later() -> Self {
LayerAccessStats(Mutex::default())
}

pub(crate) fn for_new_layer_file() -> Self {
/// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn for_loading_layer<L>(
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
status: LayerResidenceStatus,
) -> Self
where
L: ?Sized + Layer,
{
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
new.record_residence_event(
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
layer_map_lock_held_witness,
status,
LayerResidenceEventReason::LayerLoad,
);
new
}

/// Creates a clone of `self` and records `new_status` in the clone.
/// The `new_status` is not recorded in `self`
pub(crate) fn clone_for_residence_change(
///
/// The `new_status` is not recorded in `self`.
///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn clone_for_residence_change<L>(
&self,
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
new_status: LayerResidenceStatus,
) -> LayerAccessStats {
) -> LayerAccessStats
where
L: ?Sized + Layer,
{
let clone = {
let inner = self.0.lock().unwrap();
inner.clone()
};
let new = LayerAccessStats(Mutex::new(clone));
new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange);
new.record_residence_event(
layer_map_lock_held_witness,
new_status,
LayerResidenceEventReason::ResidenceChange,
);
new
}

fn record_residence_event(
/// Record a change in layer residency.
///
/// Recording the event must happen while holding the layer map lock to
/// ensure that latest-activity-threshold-based layer eviction (eviction_task.rs)
/// can do an "imitate access" to this layer, before it observes `now-latest_activity() > threshold`.
///
/// If we instead recorded the residence event with a timestamp from before grabbing the layer map lock,
/// the following race could happen:
///
/// - Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp.
/// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map.
/// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
/// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
///
pub(crate) fn record_residence_event<L>(
&self,
_layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
status: LayerResidenceStatus,
reason: LayerResidenceEventReason,
) {
) where
L: ?Sized + Layer,
{
let mut locked = self.0.lock().unwrap();
locked.iter_mut().for_each(|inner| {
inner
Expand Down Expand Up @@ -255,24 +300,37 @@ impl LayerAccessStats {
ret
}

fn most_recent_access_or_residence_event(
&self,
) -> Either<LayerAccessStatFullDetails, LayerResidenceEvent> {
/// Get the latest access timestamp, falling back to latest residence event.
///
/// This function can only return `None` if there has not yet been a call to the
/// [`record_residence_event`] method. That would generally be considered an
/// implementation error. This function logs a rate-limited warning in that case.
///
/// TODO: use type system to avoid the need for `fallback`.
/// The approach in https://github.com/neondatabase/neon/pull/3775
/// could be used to enforce that a residence event is recorded
/// before a layer is added to the layer map. We could also have
/// a layer wrapper type that holds the LayerAccessStats, and ensure
/// that that type can only be produced by inserting into the layer map.
pub(crate) fn latest_activity(&self) -> Option<SystemTime> {
let locked = self.0.lock().unwrap();
let inner = &locked.for_eviction_policy;
match inner.last_accesses.recent() {
Some(a) => Either::Left(*a),
Some(a) => Some(a.when),
None => match inner.last_residence_changes.recent() {
Some(e) => Either::Right(e.clone()),
None => unreachable!("constructors for LayerAccessStats ensure that there's always a residence change event"),
}
}
}

pub(crate) fn latest_activity(&self) -> SystemTime {
match self.most_recent_access_or_residence_event() {
Either::Left(mra) => mra.when,
Either::Right(re) => re.timestamp,
Some(e) => Some(e.timestamp),
None => {
static WARN_RATE_LIMIT: Lazy<Mutex<(usize, RateLimit)>> =
Lazy::new(|| Mutex::new((0, RateLimit::new(Duration::from_secs(10)))));
let mut guard = WARN_RATE_LIMIT.lock().unwrap();
guard.0 += 1;
let occurences = guard.0;
guard.1.call(move || {
warn!(parent: None, occurences, "latest_activity not available, this is an implementation bug, using fallback value");
});
None
}
},
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use utils::{

use super::{
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
LayerKeyIter, LayerResidenceStatus, PathOrConf,
LayerKeyIter, PathOrConf,
};

///
Expand Down Expand Up @@ -637,7 +637,7 @@ impl DeltaLayer {
key_range: summary.key_range,
lsn_range: summary.lsn_range,
file_size: metadata.len(),
access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
Expand Down Expand Up @@ -808,7 +808,7 @@ impl DeltaLayerWriterInner {
key_range: self.key_start..key_end,
lsn_range: self.lsn_range.clone(),
file_size: metadata.len(),
access_stats: LayerAccessStats::for_new_layer_file(),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
Expand Down
6 changes: 3 additions & 3 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use utils::{
};

use super::filename::{ImageFileName, LayerFileName};
use super::{Layer, LayerAccessStatsReset, LayerIter, LayerResidenceStatus, PathOrConf};
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf};

///
/// Header stored in the beginning of the file
Expand Down Expand Up @@ -438,7 +438,7 @@ impl ImageLayer {
key_range: summary.key_range,
lsn: summary.lsn,
file_size: metadata.len(),
access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: RwLock::new(ImageLayerInner {
file: None,
loaded: false,
Expand Down Expand Up @@ -598,7 +598,7 @@ impl ImageLayerWriterInner {
key_range: self.key_range.clone(),
lsn: self.lsn,
file_size: metadata.len(),
access_stats: LayerAccessStats::for_new_layer_file(),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: RwLock::new(ImageLayerInner {
loaded: false,
file: None,
Expand Down
21 changes: 15 additions & 6 deletions pageserver/src/tenant/storage_layer/remote_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::repository::Key;
use crate::tenant::layer_map::BatchedUpdates;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use anyhow::{bail, Result};
Expand Down Expand Up @@ -246,11 +247,15 @@ impl RemoteLayer {
}

/// Create a Layer struct representing this layer, after it has been downloaded.
pub fn create_downloaded_layer(
pub fn create_downloaded_layer<L>(
&self,
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
conf: &'static PageServerConf,
file_size: u64,
) -> Arc<dyn PersistentLayer> {
) -> Arc<dyn PersistentLayer>
where
L: ?Sized + Layer,
{
if self.is_delta {
let fname = DeltaFileName {
key_range: self.key_range.clone(),
Expand All @@ -262,8 +267,10 @@ impl RemoteLayer {
self.tenantid,
&fname,
file_size,
self.access_stats
.clone_for_residence_change(LayerResidenceStatus::Resident),
self.access_stats.clone_for_residence_change(
layer_map_lock_held_witness,
LayerResidenceStatus::Resident,
),
))
} else {
let fname = ImageFileName {
Expand All @@ -276,8 +283,10 @@ impl RemoteLayer {
self.tenantid,
&fname,
file_size,
self.access_stats
.clone_for_residence_change(LayerResidenceStatus::Resident),
self.access_stats.clone_for_residence_change(
layer_map_lock_held_witness,
LayerResidenceStatus::Resident,
),
))
}
}
Expand Down
Loading