diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 9cdcf3a17391..48657f9dbc02 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -241,6 +241,7 @@ pub struct LayerMapInfo { #[repr(usize)] pub enum LayerAccessKind { GetValueReconstructData, + ExtractHoles, Iter, KeyIter, Dump, diff --git a/pageserver/benches/bench_layer_map.rs b/pageserver/benches/bench_layer_map.rs index 5edfa84d8a2a..44a66031c698 100644 --- a/pageserver/benches/bench_layer_map.rs +++ b/pageserver/benches/bench_layer_map.rs @@ -1,5 +1,7 @@ +use pageserver::context::{DownloadBehavior, RequestContext}; use pageserver::keyspace::{KeyPartitioning, KeySpace}; use pageserver::repository::Key; +use pageserver::task_mgr::TaskKind; use pageserver::tenant::layer_map::LayerMap; use pageserver::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName}; use rand::prelude::{SeedableRng, SliceRandom, StdRng}; @@ -16,6 +18,7 @@ use utils::lsn::Lsn; use criterion::{criterion_group, criterion_main, Criterion}; fn build_layer_map(filename_dump: PathBuf) -> LayerMap { + let ctx = RequestContext::new(TaskKind::Benchmark, DownloadBehavior::Error); let mut layer_map = LayerMap::::default(); let mut min_lsn = Lsn(u64::MAX); @@ -33,7 +36,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap { min_lsn = min(min_lsn, lsn_range.start); max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1)); - updates.insert_historic(Arc::new(layer)); + updates.insert_historic(Arc::new(layer), &ctx).unwrap(); } println!("min: {min_lsn}, max: {max_lsn}"); @@ -135,6 +138,7 @@ fn bench_from_captest_env(c: &mut Criterion) { // Benchmark using metadata extracted from a real project that was taknig // too long processing layer map queries. fn bench_from_real_project(c: &mut Criterion) { + let ctx = RequestContext::new(TaskKind::Benchmark, DownloadBehavior::Error); // Init layer map let now = Instant::now(); let layer_map = build_layer_map(PathBuf::from("benches/odd-brook-layernames.txt")); @@ -157,12 +161,13 @@ fn bench_from_real_project(c: &mut Criterion) { println!("running correctness check"); let now = Instant::now(); - let result_bruteforce = layer_map.get_difficulty_map_bruteforce(latest_lsn, &partitioning); + let result_bruteforce = + layer_map.get_difficulty_map_bruteforce(latest_lsn, &partitioning, &ctx); assert!(result_bruteforce.len() == partitioning.parts.len()); println!("Finished bruteforce in {:?}", now.elapsed()); let now = Instant::now(); - let result_fast = layer_map.get_difficulty_map(latest_lsn, &partitioning, None); + let result_fast = layer_map.get_difficulty_map(latest_lsn, &partitioning, None, &ctx); assert!(result_fast.len() == partitioning.parts.len()); println!("Finished fast in {:?}", now.elapsed()); @@ -189,7 +194,7 @@ fn bench_from_real_project(c: &mut Criterion) { }); group.bench_function("get_difficulty_map", |b| { b.iter(|| { - layer_map.get_difficulty_map(latest_lsn, &partitioning, Some(3)); + layer_map.get_difficulty_map(latest_lsn, &partitioning, Some(3), &ctx); }); }); group.finish(); @@ -197,6 +202,7 @@ fn bench_from_real_project(c: &mut Criterion) { // Benchmark using synthetic data. Arrange image layers on stacked diagonal lines. fn bench_sequential(c: &mut Criterion) { + let ctx = RequestContext::new(TaskKind::Benchmark, DownloadBehavior::Error); // Init layer map. Create 100_000 layers arranged in 1000 diagonal lines. // // TODO This code is pretty slow and runs even if we're only running other @@ -206,7 +212,7 @@ fn bench_sequential(c: &mut Criterion) { let now = Instant::now(); let mut layer_map = LayerMap::default(); let mut updates = layer_map.batch_update(); - for i in 0..100_000 { + for i in 1..100_000 { let i32 = (i as u32) % 100; let zero = Key::from_hex("000000000000000000000000000000000000").unwrap(); let layer = LayerDescriptor { @@ -215,7 +221,7 @@ fn bench_sequential(c: &mut Criterion) { is_incremental: false, short_id: format!("Layer {}", i), }; - updates.insert_historic(Arc::new(layer)); + updates.insert_historic(Arc::new(layer), &ctx).unwrap(); } updates.flush(); println!("Finished layer map init in {:?}", now.elapsed()); diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 229cf96ee37b..39995f9c7ccc 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -570,6 +570,7 @@ async fn layer_download_handler(request: Request) -> Result } async fn evict_timeline_layer_handler(request: Request) -> Result, ApiError> { + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error); let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; @@ -577,7 +578,7 @@ async fn evict_timeline_layer_handler(request: Request) -> Result { /// /// The new timeline is initialized in Active state, and its background jobs are /// started - pub fn initialize(self, _ctx: &RequestContext) -> anyhow::Result> { + pub fn initialize(self, ctx: &RequestContext) -> anyhow::Result> { let mut timelines = self.owning_tenant.timelines.lock().unwrap(); - self.initialize_with_lock(&mut timelines, true, true) + self.initialize_with_lock(&mut timelines, true, true, ctx) } /// Like `initialize`, but the caller is already holding lock on Tenant::timelines. @@ -191,6 +191,7 @@ impl UninitializedTimeline<'_> { timelines: &mut HashMap>, load_layer_map: bool, activate: bool, + ctx: &RequestContext, ) -> anyhow::Result> { let timeline_id = self.timeline_id; let tenant_id = self.owning_tenant.tenant_id; @@ -211,7 +212,7 @@ impl UninitializedTimeline<'_> { Entry::Vacant(v) => { if load_layer_map { new_timeline - .load_layer_map(new_disk_consistent_lsn) + .load_layer_map(new_disk_consistent_lsn, ctx) .with_context(|| { format!( "Failed to load layermap for timeline {tenant_id}/{timeline_id}" @@ -459,7 +460,7 @@ impl Tenant { local_metadata: Option, ancestor: Option>, first_save: bool, - _ctx: &RequestContext, + ctx: &RequestContext, ) -> anyhow::Result<()> { let tenant_id = self.tenant_id; @@ -494,7 +495,7 @@ impl Tenant { // Do not start walreceiver here. We do need loaded layer map for reconcile_with_remote // But we shouldnt start walreceiver before we have all the data locally, because working walreceiver // will ingest data which may require looking at the layers which are not yet available locally - match timeline.initialize_with_lock(&mut timelines_accessor, true, false) { + match timeline.initialize_with_lock(&mut timelines_accessor, true, false, ctx) { Ok(new_timeline) => new_timeline, Err(e) => { error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}"); @@ -528,6 +529,7 @@ impl Tenant { .reconcile_with_remote( up_to_date_metadata, remote_startup_data.as_ref().map(|r| &r.index_part), + ctx, ) .await .context("failed to reconcile with remote")? @@ -1954,7 +1956,7 @@ impl Tenant { // made. break; } - let result = timeline.gc().await?; + let result = timeline.gc(ctx).await?; totals += result; } @@ -2078,7 +2080,7 @@ impl Tenant { src_timeline: &Arc, dst_id: TimelineId, start_lsn: Option, - _ctx: &RequestContext, + ctx: &RequestContext, ) -> anyhow::Result> { let src_id = src_timeline.timeline_id; @@ -2171,7 +2173,7 @@ impl Tenant { false, Some(Arc::clone(src_timeline)), )? - .initialize_with_lock(&mut timelines, true, true)?; + .initialize_with_lock(&mut timelines, true, true, ctx)?; drop(timelines); info!("branched timeline {dst_id} from {src_id} at {start_lsn}"); @@ -2272,7 +2274,7 @@ impl Tenant { let timeline = { let mut timelines = self.timelines.lock().unwrap(); - raw_timeline.initialize_with_lock(&mut timelines, false, true)? + raw_timeline.initialize_with_lock(&mut timelines, false, true, ctx)? }; info!( @@ -3426,7 +3428,7 @@ mod tests { .await?; tline.freeze_and_flush().await?; tline.compact(&ctx).await?; - tline.gc().await?; + tline.gc(&ctx).await?; } Ok(()) @@ -3498,7 +3500,7 @@ mod tests { .await?; tline.freeze_and_flush().await?; tline.compact(&ctx).await?; - tline.gc().await?; + tline.gc(&ctx).await?; } Ok(()) @@ -3582,7 +3584,7 @@ mod tests { .await?; tline.freeze_and_flush().await?; tline.compact(&ctx).await?; - tline.gc().await?; + tline.gc(&ctx).await?; } Ok(()) diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 9d8c825220d5..fe7101885c2f 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -53,7 +53,8 @@ use crate::repository::Key; use crate::tenant::storage_layer::InMemoryLayer; use crate::tenant::storage_layer::Layer; use anyhow::Result; -use std::collections::VecDeque; +use std::collections::{HashSet, VecDeque}; +use std::hash::{Hash, Hasher}; use std::ops::Range; use std::sync::Arc; use utils::lsn::Lsn; @@ -63,6 +64,45 @@ pub use historic_layer_coverage::Replacement; use super::storage_layer::range_eq; +pub fn compare_arced_layers(left: &Arc, right: &Arc) -> bool { + // "dyn Trait" objects are "fat pointers" in that they have two components: + // - pointer to the object + // - pointer to the vtable + // + // rust does not provide a guarantee that these vtables are unique, but however + // `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the + // pointer and the vtable need to be equal. + // + // See: https://github.com/rust-lang/rust/issues/103763 + // + // A future version of rust will most likely use this form below, where we cast each + // pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it + // not affect the comparison. + // + // See: https://github.com/rust-lang/rust/pull/106450 + let left = Arc::as_ptr(left) as *const (); + let right = Arc::as_ptr(right) as *const (); + + left == right +} + +struct LayerRef(Arc); + +impl PartialEq for LayerRef { + fn eq(&self, other: &LayerRef) -> bool { + compare_arced_layers(&self.0, &other.0) + } +} + +impl Eq for LayerRef {} + +impl Hash for LayerRef { + fn hash(&self, state: &mut H) { + let ptr = Arc::as_ptr(&self.0) as *const (); + ptr.hash(state) + } +} + /// /// LayerMap tracks what layers exist on a timeline. /// @@ -89,9 +129,11 @@ pub struct LayerMap { /// Index of the historic layers optimized for search historic: BufferedHistoricLayerCoverage>, - /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient. - /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree. - l0_delta_layers: Vec>, + /// L0 layers are kept in separate set because we need fast way to iterate through L0 layers in compaction + l0_delta_layers: HashSet>, + + /// `historic` tree may contain maultuple reference to one layer, so lets maintain separate set instead of eliminating duplicates + historic_layers: HashSet>, } impl Default for LayerMap { @@ -100,8 +142,9 @@ impl Default for LayerMap { open_layer: None, next_open_layer_at: None, frozen_layers: VecDeque::default(), - l0_delta_layers: Vec::default(), historic: BufferedHistoricLayerCoverage::default(), + l0_delta_layers: HashSet::default(), + historic_layers: HashSet::default(), } } } @@ -126,8 +169,8 @@ where /// /// Insert an on-disk layer. /// - pub fn insert_historic(&mut self, layer: Arc) { - self.layer_map.insert_historic_noflush(layer) + pub fn insert_historic(&mut self, layer: Arc, ctx: &RequestContext) -> Result<()> { + self.layer_map.insert_historic_noflush(layer, ctx) } /// @@ -135,8 +178,9 @@ where /// /// This should be called when the corresponding file on disk has been deleted. /// - pub fn remove_historic(&mut self, layer: Arc) { - self.layer_map.remove_historic_noflush(layer) + + pub fn remove_historic(&mut self, layer: Arc, ctx: &RequestContext) -> Result<()> { + self.layer_map.remove_historic_noflush(layer, ctx) } /// Replaces existing layer iff it is the `expected`. @@ -153,8 +197,9 @@ where &mut self, expected: &Arc, new: Arc, - ) -> anyhow::Result>> { - self.layer_map.replace_historic_noflush(expected, new) + ctx: &RequestContext, + ) -> anyhow::Result { + self.layer_map.replace_historic_noflush(expected, new, ctx) } // We will flush on drop anyway, but this method makes it @@ -272,17 +317,28 @@ where /// /// Helper function for BatchedUpdates::insert_historic /// - pub(self) fn insert_historic_noflush(&mut self, layer: Arc) { - self.historic.insert( - historic_layer_coverage::LayerKey::from(&*layer), - Arc::clone(&layer), - ); - + pub(self) fn insert_historic_noflush( + &mut self, + layer: Arc, + ctx: &RequestContext, + ) -> Result<()> { + let lr = layer.get_lsn_range(); + for kr in layer.get_occupied_ranges(ctx)? { + self.historic.insert( + historic_layer_coverage::LayerKey { + key: kr.start.to_i128()..kr.end.to_i128(), + lsn: lr.start.0..lr.end.0, + is_image: !layer.is_incremental(), + }, + Arc::clone(&layer), + ); + } if Self::is_l0(&layer) { - self.l0_delta_layers.push(layer); + self.l0_delta_layers.insert(LayerRef(layer.clone())); } - + self.historic_layers.insert(LayerRef(layer)); NUM_ONDISK_LAYERS.inc(); + Ok(()) } /// @@ -290,32 +346,30 @@ where /// /// Helper function for BatchedUpdates::remove_historic /// - pub fn remove_historic_noflush(&mut self, layer: Arc) { - self.historic - .remove(historic_layer_coverage::LayerKey::from(&*layer)); + pub fn remove_historic_noflush(&mut self, layer: Arc, ctx: &RequestContext) -> Result<()> { + let lr = layer.get_lsn_range(); + for kr in layer.get_occupied_ranges(ctx)? { + self.historic.remove(historic_layer_coverage::LayerKey { + key: kr.start.to_i128()..kr.end.to_i128(), + lsn: lr.start.0..lr.end.0, + is_image: !layer.is_incremental(), + }); + } if Self::is_l0(&layer) { - let len_before = self.l0_delta_layers.len(); - self.l0_delta_layers - .retain(|other| !Self::compare_arced_layers(other, &layer)); - // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers, - // there's a chance that the comparison fails at runtime due to it comparing (pointer, - // vtable) pairs. - assert_eq!( - self.l0_delta_layers.len(), - len_before - 1, - "failed to locate removed historic layer from l0_delta_layers" - ); + assert!(self.l0_delta_layers.remove(&LayerRef(layer.clone()))); } - + assert!(self.historic_layers.remove(&LayerRef(layer))); NUM_ONDISK_LAYERS.dec(); + Ok(()) } pub(self) fn replace_historic_noflush( &mut self, expected: &Arc, new: Arc, - ) -> anyhow::Result>> { + ctx: &RequestContext, + ) -> anyhow::Result { let key = historic_layer_coverage::LayerKey::from(&**expected); let other = historic_layer_coverage::LayerKey::from(&*new); @@ -332,28 +386,67 @@ where "expected and new must both be l0 deltas or neither should be: {expected_l0} != {new_l0}" ); - let l0_index = if expected_l0 { - // find the index in case replace worked, we need to replace that as well - Some( - self.l0_delta_layers - .iter() - .position(|slot| Self::compare_arced_layers(slot, expected)) - .ok_or_else(|| anyhow::anyhow!("existing l0 delta layer was not found"))?, - ) + let lr = expected.get_lsn_range(); + let replaced = self.historic_layers.remove(&LayerRef(expected.clone())); + if replaced { + for kr in expected.get_occupied_ranges(ctx)? { + match self.historic.replace( + &historic_layer_coverage::LayerKey { + key: kr.start.to_i128()..kr.end.to_i128(), + lsn: lr.start.0..lr.end.0, + is_image: !expected.is_incremental(), + }, + new.clone(), + |existing| compare_arced_layers(existing, expected), + ) { + Replacement::Replaced { .. } => { /* expected */ } + Replacement::NotFound => { + // TODO: the downloaded file should probably be removed, otherwise + // it will be added to the layermap on next load? we should + // probably restart any get_reconstruct_data search as well. + // + // See: https://github.com/neondatabase/neon/issues/3533 + anyhow::bail!("replacing downloaded layer into layermap failed because layer was not found"); + } + Replacement::RemovalBuffered => { + unreachable!("current implementation does not remove anything") + } + Replacement::Unexpected(_other) => { + // if the other layer would have the same pointer value as + // expected, it means they differ only on vtables. + // + // otherwise there's no known reason for this to happen as + // compacted layers should have different covering rectangle + // leading to produce Replacement::NotFound. + + anyhow::bail!( + "replacing downloaded layer into layermap failed because another layer was found instead of expected" + ); + } + }; + } + if expected_l0 { + anyhow::ensure!( + self.l0_delta_layers.remove(&LayerRef(expected.clone())), + "existing l0 delta layer was not found" + ); + } } else { - None - }; - - let replaced = self.historic.replace(&key, new.clone(), |existing| { - Self::compare_arced_layers(existing, expected) - }); - - if let Replacement::Replaced { .. } = &replaced { - if let Some(index) = l0_index { - self.l0_delta_layers[index] = new; + for kr in new.get_occupied_ranges(ctx)? { + self.historic.insert( + historic_layer_coverage::LayerKey { + key: kr.start.to_i128()..kr.end.to_i128(), + lsn: lr.start.0..lr.end.0, + is_image: !new.is_incremental(), + }, + Arc::clone(&new), + ); } } - + if expected_l0 { + self.l0_delta_layers.insert(LayerRef(new.clone())); + } + self.historic_layers.insert(LayerRef(new)); Ok(replaced) } @@ -403,7 +496,7 @@ where } pub fn iter_historic_layers(&self) -> impl '_ + Iterator> { - self.historic.iter() + self.historic_layers.iter().map(|r| r.0.clone()) } /// @@ -473,18 +566,25 @@ where /// TODO The optimal number should probably be slightly higher than 1, but to /// implement that we need to plumb a lot more context into this function /// than just the current partition_range. - pub fn is_reimage_worthy(layer: &L, partition_range: &Range) -> bool { + pub fn is_reimage_worthy( + layer: &L, + partition_range: &Range, + ctx: &RequestContext, + ) -> Result { + if !layer.overlaps(partition_range, ctx)? { + return Ok(false); + } // Case 1 if !Self::is_l0(layer) { - return true; + return Ok(true); } // Case 2 if range_eq(partition_range, &(Key::MIN..Key::MAX)) { - return true; + return Ok(true); } - false + Ok(false) } /// Count the height of the tallest stack of reimage-worthy deltas @@ -500,6 +600,7 @@ where key: &Range, lsn: &Range, limit: Option, + ctx: &RequestContext, ) -> Result { // We get the delta coverage of the region, and for each part of the coverage // we recurse right underneath the delta. The recursion depth is limited by @@ -531,10 +632,10 @@ where let kr = Key::from_i128(current_key)..Key::from_i128(change_key); let lr = lsn.start..val.get_lsn_range().start; if !kr.is_empty() { - let base_count = Self::is_reimage_worthy(&val, key) as usize; + let base_count = Self::is_reimage_worthy(&val, key, ctx)? as usize; let new_limit = limit.map(|l| l - base_count); let max_stacked_deltas_underneath = - self.count_deltas(&kr, &lr, new_limit)?; + self.count_deltas(&kr, &lr, new_limit, ctx)?; max_stacked_deltas = std::cmp::max( max_stacked_deltas, base_count + max_stacked_deltas_underneath, @@ -542,7 +643,6 @@ where } } } - current_key = change_key; current_val = change_val.clone(); } @@ -554,9 +654,10 @@ where let lr = lsn.start..val.get_lsn_range().start; if !kr.is_empty() { - let base_count = Self::is_reimage_worthy(&val, key) as usize; + let base_count = Self::is_reimage_worthy(&val, key, ctx)? as usize; let new_limit = limit.map(|l| l - base_count); - let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?; + let max_stacked_deltas_underneath = + self.count_deltas(&kr, &lr, new_limit, ctx)?; max_stacked_deltas = std::cmp::max( max_stacked_deltas, base_count + max_stacked_deltas_underneath, @@ -573,12 +674,19 @@ where /// The `partition_range` argument is used as context for the reimage-worthiness decision. /// /// Used as a helper for correctness checks only. Performance not critical. - pub fn get_difficulty(&self, lsn: Lsn, key: Key, partition_range: &Range) -> usize { + pub fn get_difficulty( + &self, + lsn: Lsn, + key: Key, + partition_range: &Range, + ctx: &RequestContext, + ) -> usize { match self.search(key, lsn) { Some(search_result) => { if search_result.layer.is_incremental() { - (Self::is_reimage_worthy(&search_result.layer, partition_range) as usize) - + self.get_difficulty(search_result.lsn_floor, key, partition_range) + (Self::is_reimage_worthy(&search_result.layer, partition_range, ctx).unwrap() + as usize) + + self.get_difficulty(search_result.lsn_floor, key, partition_range, ctx) } else { 0 } @@ -593,6 +701,7 @@ where &self, lsn: Lsn, partitioning: &KeyPartitioning, + ctx: &RequestContext, ) -> Vec { // Looking at the difficulty as a function of key, it could only increase // when a delta layer starts or an image layer ends. Therefore it's sufficient @@ -627,8 +736,10 @@ where // TODO assert it for range in &part.ranges { if !range.is_empty() { - difficulty = - std::cmp::max(difficulty, self.get_difficulty(lsn, range.start, range)); + difficulty = std::cmp::max( + difficulty, + self.get_difficulty(lsn, range.start, range, ctx), + ); } while let Some(key) = keys_iter.peek() { if key >= &range.end { @@ -639,7 +750,7 @@ where continue; } difficulty = - std::cmp::max(difficulty, self.get_difficulty(lsn, key, range)); + std::cmp::max(difficulty, self.get_difficulty(lsn, key, range, ctx)); } } difficulty @@ -664,6 +775,7 @@ where lsn: Lsn, partitioning: &KeyPartitioning, limit: Option, + ctx: &RequestContext, ) -> Vec { // TODO This is a naive implementation. Perf improvements to do: // 1. Instead of calling self.image_coverage and self.count_deltas, @@ -692,7 +804,7 @@ where if img_lsn < lsn { let num_deltas = self - .count_deltas(&img_range, &(img_lsn..lsn), limit) + .count_deltas(&img_range, &(img_lsn..lsn), limit, ctx) .expect("why would this err lol?"); difficulty = std::cmp::max(difficulty, num_deltas); } @@ -705,7 +817,7 @@ where /// Return all L0 delta layers pub fn get_level0_deltas(&self) -> Result>> { - Ok(self.l0_delta_layers.clone()) + Ok(self.l0_delta_layers.iter().map(|r| r.0.clone()).collect()) } /// debugging function to print out the contents of the layer map @@ -730,34 +842,14 @@ where println!("End dump LayerMap"); Ok(()) } - - #[inline(always)] - fn compare_arced_layers(left: &Arc, right: &Arc) -> bool { - // "dyn Trait" objects are "fat pointers" in that they have two components: - // - pointer to the object - // - pointer to the vtable - // - // rust does not provide a guarantee that these vtables are unique, but however - // `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the - // pointer and the vtable need to be equal. - // - // See: https://github.com/rust-lang/rust/issues/103763 - // - // A future version of rust will most likely use this form below, where we cast each - // pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it - // not affect the comparison. - // - // See: https://github.com/rust-lang/rust/pull/106450 - let left = Arc::as_ptr(left) as *const (); - let right = Arc::as_ptr(right) as *const (); - - left == right - } } #[cfg(test)] mod tests { - use super::{LayerMap, Replacement}; + use super::LayerMap; + use crate::context::{DownloadBehavior, RequestContext}; + use crate::task_mgr::TaskKind; + use crate::tenant::layer_map::compare_arced_layers; use crate::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName}; use std::str::FromStr; use std::sync::Arc; @@ -796,6 +888,7 @@ mod tests { } fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); let name = LayerFileName::from_str(layer_name).unwrap(); let skeleton = LayerDescriptor::from(name); @@ -805,38 +898,38 @@ mod tests { let mut map = LayerMap::default(); // two disjoint Arcs in different lifecycle phases. - assert!(!LayerMap::compare_arced_layers(&remote, &downloaded)); + assert!(!compare_arced_layers(&remote, &downloaded)); let expected_in_counts = (1, usize::from(expected_l0)); - map.batch_update().insert_historic(remote.clone()); + map.batch_update() + .insert_historic(remote.clone(), &ctx) + .expect("historic layer is inserted"); assert_eq!(count_layer_in(&map, &remote), expected_in_counts); - let replaced = map + assert!(map .batch_update() - .replace_historic(&remote, downloaded.clone()) - .expect("name derived attributes are the same"); - assert!( - matches!(replaced, Replacement::Replaced { .. }), - "{replaced:?}" - ); + .replace_historic(&remote, downloaded.clone(), &ctx) + .expect("name derived attributes are the same")); assert_eq!(count_layer_in(&map, &downloaded), expected_in_counts); - map.batch_update().remove_historic(downloaded.clone()); + map.batch_update() + .remove_historic(downloaded.clone(), &ctx) + .expect("downloaded layer is found"); assert_eq!(count_layer_in(&map, &downloaded), (0, 0)); } fn count_layer_in(map: &LayerMap, layer: &Arc) -> (usize, usize) { let historic = map .iter_historic_layers() - .filter(|x| LayerMap::compare_arced_layers(x, layer)) + .filter(|x| compare_arced_layers(x, layer)) .count(); let l0s = map .get_level0_deltas() .expect("why does this return a result"); let l0 = l0s .iter() - .filter(|x| LayerMap::compare_arced_layers(x, layer)) + .filter(|x| compare_arced_layers(x, layer)) .count(); (historic, l0) diff --git a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs index b63c36131465..03eb860e56e3 100644 --- a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs +++ b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs @@ -521,17 +521,6 @@ impl BufferedHistoricLayerCoverage { ) } - /// Iterate all the layers - pub fn iter(&self) -> impl '_ + Iterator { - // NOTE we can actually perform this without rebuilding, - // but it's not necessary for now. - if !self.buffer.is_empty() { - panic!("rebuild pls") - } - - self.layers.values().cloned() - } - /// Return a reference to a queryable map, assuming all updates /// have already been processed using self.rebuild() pub fn get(&self) -> anyhow::Result<&HistoricLayerCoverage> { diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 985b480a764c..cf5ed9226427 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -419,33 +419,6 @@ impl RemoteTimelineClient { .await? }; - // Update the metadata for given layer file. The remote index file - // might be missing some information for the file; this allows us - // to fill in the missing details. - if layer_metadata.file_size().is_none() { - let new_metadata = LayerFileMetadata::new(downloaded_size); - let mut guard = self.upload_queue.lock().unwrap(); - let upload_queue = guard.initialized_mut()?; - if let Some(upgraded) = upload_queue.latest_files.get_mut(layer_file_name) { - if upgraded.merge(&new_metadata) { - upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1; - } - // If we don't do an index file upload inbetween here and restart, - // the value will go back down after pageserver restart, since we will - // have lost this data point. - // But, we upload index part fairly frequently, and restart pageserver rarely. - // So, by accounting eagerly, we present a most-of-the-time-more-accurate value sooner. - self.metrics - .remote_physical_size_gauge() - .add(downloaded_size); - } else { - // The file should exist, since we just downloaded it. - warn!( - "downloaded file {:?} not found in local copy of the index file", - layer_file_name - ); - } - } Ok(downloaded_size) } @@ -453,6 +426,40 @@ impl RemoteTimelineClient { // Upload operations. // + /// + /// Upgrade layer metadata + /// + pub async fn upgrade_layer_metadata( + &self, + layer_file_name: &LayerFileName, + old_metadata: &LayerFileMetadata, + new_metadata: &LayerFileMetadata, + ) { + let mut guard = self.upload_queue.lock().unwrap(); + let upload_queue = guard.initialized_mut().unwrap(); + if let Some(upgraded) = upload_queue.latest_files.get_mut(layer_file_name) { + if upgraded.merge(new_metadata) { + upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1; + } + // If we don't do an index file upload inbetween here and restart, + // the value will go back down after pageserver restart, since we will + // have lost this data point. + // But, we upload index part fairly frequently, and restart pageserver rarely. + // So, by accounting eagerly, we present a most-of-the-time-more-accurate value sooner. + if old_metadata.file_size().is_none() { + self.metrics + .remote_physical_size_gauge() + .add(new_metadata.file_size().unwrap_or(0)); + } + } else { + // The file should exist, since we just downloaded it. + warn!( + "downloaded file {:?} not found in local copy of the index file", + layer_file_name + ); + } + } + /// /// Launch an index-file upload operation in the background, with /// updated metadata. @@ -1153,11 +1160,11 @@ mod tests { client.schedule_layer_file_upload( &layer_file_name_1, - &LayerFileMetadata::new(content_1.len() as u64), + &LayerFileMetadata::new(content_1.len() as u64, None), )?; client.schedule_layer_file_upload( &layer_file_name_2, - &LayerFileMetadata::new(content_2.len() as u64), + &LayerFileMetadata::new(content_2.len() as u64, None), )?; // Check that they are started immediately, not queued @@ -1209,7 +1216,7 @@ mod tests { std::fs::write(timeline_path.join("baz"), &content_baz)?; client.schedule_layer_file_upload( &layer_file_name_3, - &LayerFileMetadata::new(content_baz.len() as u64), + &LayerFileMetadata::new(content_baz.len() as u64, None), )?; client.schedule_layer_file_deletion(&[layer_file_name_1.clone()])?; { diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index 420edae6cd44..088204b3e358 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -9,6 +9,7 @@ use serde_with::{serde_as, DisplayFromStr}; use tracing::warn; use crate::tenant::metadata::TimelineMetadata; +use crate::tenant::storage_layer::Hole; use crate::tenant::storage_layer::LayerFileName; use utils::lsn::Lsn; @@ -21,31 +22,41 @@ use utils::lsn::Lsn; #[cfg_attr(test, derive(Default))] pub struct LayerFileMetadata { file_size: Option, + holes: Option>, } impl From<&'_ IndexLayerMetadata> for LayerFileMetadata { fn from(other: &IndexLayerMetadata) -> Self { LayerFileMetadata { file_size: other.file_size, + holes: other.holes.clone(), } } } impl LayerFileMetadata { - pub fn new(file_size: u64) -> Self { + pub fn new(file_size: u64, holes: Option>) -> Self { LayerFileMetadata { file_size: Some(file_size), + holes, } } /// This is used to initialize the metadata for remote layers, for which /// the metadata was missing from the index part file. - pub const MISSING: Self = LayerFileMetadata { file_size: None }; + pub const MISSING: Self = LayerFileMetadata { + file_size: None, + holes: None, + }; pub fn file_size(&self) -> Option { self.file_size } + pub fn holes(&self) -> Option> { + self.holes.clone() + } + /// Metadata has holes due to version upgrades. This method is called to upgrade self with the /// other value. /// @@ -59,6 +70,11 @@ impl LayerFileMetadata { changed = true; } + if self.holes != other.holes { + self.holes = other.holes.as_ref().or(self.holes.as_ref()).cloned(); + changed = true; + } + changed } } @@ -198,7 +214,7 @@ impl IndexPart { /// used to understand later versions. /// /// Version is currently informative only. - const LATEST_VERSION: usize = 1; + const LATEST_VERSION: usize = 2; pub const FILE_NAME: &'static str = "index_part.json"; pub fn new( @@ -233,12 +249,14 @@ impl IndexPart { #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)] pub struct IndexLayerMetadata { pub(super) file_size: Option, + pub(super) holes: Option>, } impl From<&'_ LayerFileMetadata> for IndexLayerMetadata { fn from(other: &'_ LayerFileMetadata) -> Self { IndexLayerMetadata { file_size: other.file_size, + holes: other.holes.clone(), } } } @@ -246,6 +264,7 @@ impl From<&'_ LayerFileMetadata> for IndexLayerMetadata { #[cfg(test)] mod tests { use super::*; + use crate::repository::Key; #[test] fn v0_indexpart_is_parsed() { @@ -288,11 +307,13 @@ mod tests { layer_metadata: HashMap::from([ ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata { file_size: Some(25600000), + holes: None, }), ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata { // serde_json should always parse this but this might be a double with jq for // example. file_size: Some(9007199254741001), + holes: None, }) ]), disk_consistent_lsn: "0/16960E8".parse::().unwrap(), @@ -326,11 +347,13 @@ mod tests { layer_metadata: HashMap::from([ ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata { file_size: Some(25600000), + holes: None, }), ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata { // serde_json should always parse this but this might be a double with jq for // example. file_size: Some(9007199254741001), + holes: None, }) ]), disk_consistent_lsn: "0/16960E8".parse::().unwrap(), @@ -341,4 +364,54 @@ mod tests { let part = part.remove_unclean_layer_file_names(); assert_eq!(part, expected); } + + #[test] + fn v2_indexpart_is_parsed() { + let example = r#"{ + "version": 2, + "timeline_layers":["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"], + "layer_metadata":{ + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { + "file_size": 25600000, + "holes": [{"start": "000000000000000000000000000000000000", "end": "010000000000000000000000000000000000"}] + }, + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { + "file_size": 9007199254741001, + "holes": [ + {"start": "010000000000000000000000000000000000", "end": "020000000000000000000000000000000001"}, + {"start": "030000000000000000000000000000000000", "end": "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"} + ] + } + }, + "disk_consistent_lsn":"0/16960E8", + "metadata_bytes":[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + }"#; + + let expected = IndexPart { + // note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead? + version: 2, + timeline_layers: HashSet::from(["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap()]), + layer_metadata: HashMap::from([ + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata { + file_size: Some(25600000), + holes: Some(vec![Hole(Key::from_hex("000000000000000000000000000000000000").unwrap()..Key::from_hex("010000000000000000000000000000000000").unwrap())]), + }), + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata { + // serde_json should always parse this but this might be a double with jq for + // example. + file_size: Some(9007199254741001), + holes: Some(vec![Hole(Key::from_hex("010000000000000000000000000000000000").unwrap()..Key::from_hex("020000000000000000000000000000000001").unwrap()), + Hole(Key::from_hex("030000000000000000000000000000000000").unwrap()..Key::from_hex("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF").unwrap()), + ]), + }) + ]), + disk_consistent_lsn: "0/16960E8".parse::().unwrap(), + metadata_bytes: [113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(), + }; + + let part = serde_json::from_str::(example) + .unwrap() + .remove_unclean_layer_file_names(); + assert_eq!(part, expected); + } } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index e85359af1626..bff40b3c8413 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -15,10 +15,13 @@ use anyhow::Result; use bytes::Bytes; use enum_map::EnumMap; use enumset::EnumSet; -use pageserver_api::models::LayerAccessKind; use pageserver_api::models::{ - HistoricLayerInfo, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus, + HistoricLayerInfo, LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, + LayerResidenceStatus, }; +use serde::ser::SerializeMap; +use serde::{de::MapAccess, de::Visitor, Deserialize, Deserializer, Serialize, Serializer}; +use std::fmt; use std::ops::Range; use std::path::PathBuf; use std::sync::{Arc, Mutex}; @@ -283,6 +286,25 @@ pub trait Layer: std::fmt::Debug + Send + Sync { /// Dump summary of the contents of the layer to stdout fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()>; + + /// Checks if layer contains any entries belonging to the specified key range + fn overlaps(&self, key_range: &Range, _ctx: &RequestContext) -> Result { + Ok(range_overlaps(&self.get_key_range(), key_range)) + } + + /// Skip holes in this layer key range + fn get_occupied_ranges(&self, _ctx: &RequestContext) -> Result>> { + Ok(vec![self.get_key_range()]) + } + + /// Get list of holes in key range: returns up to MAX_CACHED_HOLES largest holes, ignoring any that are smaller + /// than MIN_HOLE_LENGTH. + /// Only delta layers can contain holes. Image is consdered as always dense, despite to the fact that it doesn't + /// contain all possible key values in the specified range: there are may be no keys in the storage belonging + /// to the image layer range but not present in the image layer. + fn get_holes(&self, _ctx: &RequestContext) -> Result>> { + Ok(None) + } } /// Returned by [`Layer::iter`] @@ -371,6 +393,69 @@ pub struct LayerDescriptor { pub short_id: String, } +/// Wrapper for key range to provide reverse ordering by range length (for BinaryHeap) +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Hole(pub Range); + +// Alter default serde serialization for Key class to reduce size of index_part.json file. +// Instead of dumping Key as json object with six field? we just store hex string representing key (as in file name) +impl Serialize for Hole { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut hole = serializer.serialize_map(Some(2))?; + hole.serialize_entry("start", &self.0.start.to_string())?; + hole.serialize_entry("end", &self.0.end.to_string())?; + hole.end() + } +} + +struct HoleVisitor; + +impl<'de> Visitor<'de> for HoleVisitor { + type Value = Hole; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + write!(formatter, "a map with keys 'start' and 'end'") + } + + fn visit_map(self, mut map: M) -> Result + where + M: MapAccess<'de>, + { + let mut start = None; + let mut end = None; + + while let Some(k) = map.next_key::<&str>()? { + if k == "start" { + start = Some(map.next_value()?); + } else if k == "end" { + end = Some(map.next_value()?); + } else { + return Err(serde::de::Error::custom(&format!("Invalid key: {}", k))); + } + } + + if start.is_none() || end.is_none() { + return Err(serde::de::Error::custom("Missing start or end")); + } + + Ok(Hole( + Key::from_hex(start.unwrap()).unwrap()..Key::from_hex(end.unwrap()).unwrap(), + )) + } +} + +impl<'de> Deserialize<'de> for Hole { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_map(HoleVisitor) + } +} + impl Layer for LayerDescriptor { fn get_key_range(&self) -> Range { self.key.clone() diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 9b322faa652e..37f6dc508f33 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -30,8 +30,9 @@ use crate::repository::{Key, Value, KEY_SIZE}; use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter}; use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; +use crate::tenant::storage_layer::range_overlaps; use crate::tenant::storage_layer::{ - PersistentLayer, ValueReconstructResult, ValueReconstructState, + Hole, PersistentLayer, ValueReconstructResult, ValueReconstructState, }; use crate::virtual_file::VirtualFile; use crate::{walrecord, TEMP_FILE_SUFFIX}; @@ -40,6 +41,8 @@ use anyhow::{bail, ensure, Context, Result}; use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; +use std::cmp::Ordering; +use std::collections::BinaryHeap; use std::fs::{self, File}; use std::io::{BufWriter, Write}; use std::io::{Seek, SeekFrom}; @@ -60,6 +63,25 @@ use super::{ LayerKeyIter, LayerResidenceStatus, PathOrConf, }; +// Number of holes for delta layer kept in memory by layer map to speedup DeltaLayer::overlaps operation. +// To much number of layers can cause large memory footprint of layer map, +const MAX_CACHED_HOLES: usize = 10; // TODO: move it to tenant config? (not available in layer methods) +const MIN_HOLE_LENGTH: i128 = (128 * 1024 * 1024 / PAGE_SZ) as i128; // TODO: use compaction_target? (see above) + +impl Ord for Hole { + fn cmp(&self, other: &Self) -> Ordering { + let other_len = other.0.end.to_i128() - other.0.start.to_i128(); + let self_len = self.0.end.to_i128() - self.0.start.to_i128(); + other_len.cmp(&self_len) + } +} + +impl PartialOrd for Hole { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + /// /// Header stored in the beginning of the file /// @@ -169,9 +191,6 @@ impl DeltaKey { Lsn(u64::from_be_bytes(lsn_buf)) } } - -/// DeltaLayer is the in-memory data structure associated with an on-disk delta -/// file. /// /// We keep a DeltaLayer in memory for each file, in the LayerMap. If a layer /// is in "loaded" state, we have a copy of the index in memory, in 'inner'. @@ -213,6 +232,9 @@ pub struct DeltaLayerInner { /// Reader object for reading blocks from the file. (None if not loaded yet) file: Option>, + + /// Largest holes in this layer + holes: Option>, } impl std::fmt::Debug for DeltaLayerInner { @@ -315,6 +337,51 @@ impl Layer for DeltaLayer { Ok(()) } + fn get_occupied_ranges(&self, ctx: &RequestContext) -> Result>> { + if let Ok(inner) = self.load(LayerAccessKind::ExtractHoles, ctx) { + if let Some(holes) = &inner.holes { + let mut occ = Vec::with_capacity(holes.len() + 1); + let key_range = self.get_key_range(); + let mut prev = key_range.start; + for hole in holes { + occ.push(prev..hole.0.start); + prev = hole.0.end; + } + occ.push(prev..key_range.end); + return Ok(occ); + } + } + Ok(vec![self.get_key_range()]) + } + + fn get_holes(&self, ctx: &RequestContext) -> Result>> { + let inner = self.load(LayerAccessKind::ExtractHoles, ctx)?; + Ok(inner.holes.clone()) + } + + fn overlaps(&self, key_range: &Range, ctx: &RequestContext) -> anyhow::Result { + if !range_overlaps(&self.key_range, key_range) { + Ok(false) + } else { + let inner = self.load(LayerAccessKind::ExtractHoles, ctx)?; + if let Some(holes) = &inner.holes { + let start = match holes.binary_search_by_key(&key_range.start, |hole| hole.0.start) + { + Ok(index) => index, + Err(index) => { + if index == 0 { + return Ok(true); + } + index - 1 + } + }; + Ok(holes[start].0.end < key_range.end) + } else { + Ok(true) + } + } + } + fn get_value_reconstruct_data( &self, key: Key, @@ -585,6 +652,37 @@ impl DeltaLayer { debug!("loaded from {}", &path.display()); + // Construct vector with largest holes + let file = inner.file.as_ref().unwrap(); + let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( + inner.index_start_blk, + inner.index_root_blk, + file, + ); + // min-heap (reserve space for one more element added before eviction) + let mut heap: BinaryHeap = BinaryHeap::with_capacity(MAX_CACHED_HOLES + 1); + let mut prev_key: Option = None; + tree_reader.visit( + &[0u8; DELTA_KEY_SIZE], + VisitDirection::Forwards, + |key, _value| { + let curr = Key::from_slice(&key[..KEY_SIZE]); + if let Some(prev) = prev_key { + if curr.to_i128() - prev.to_i128() >= MIN_HOLE_LENGTH { + heap.push(Hole(prev..curr)); + if heap.len() > MAX_CACHED_HOLES { + heap.pop(); // remove smallest hole + } + } + } + prev_key = Some(curr.next()); + true + }, + )?; + let mut holes = heap.into_vec(); + holes.sort_by_key(|hole| hole.0.start); + inner.holes = Some(holes); + inner.loaded = true; Ok(()) } @@ -597,6 +695,7 @@ impl DeltaLayer { filename: &DeltaFileName, file_size: u64, access_stats: LayerAccessStats, + holes: Option>, ) -> DeltaLayer { DeltaLayer { path_or_conf: PathOrConf::Conf(conf), @@ -611,6 +710,7 @@ impl DeltaLayer { file: None, index_start_blk: 0, index_root_blk: 0, + holes, }), } } @@ -641,6 +741,7 @@ impl DeltaLayer { file: None, index_start_blk: 0, index_root_blk: 0, + holes: None, }), }) } @@ -812,6 +913,7 @@ impl DeltaLayerWriterInner { file: None, index_start_blk, index_root_blk, + holes: None, }), }; diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index 7391875d0c39..bfd9b9b2e5bc 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -5,7 +5,9 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::repository::Key; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; -use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; +use crate::tenant::storage_layer::{ + range_overlaps, Hole, Layer, ValueReconstructResult, ValueReconstructState, +}; use anyhow::{bail, Result}; use pageserver_api::models::HistoricLayerInfo; use std::ops::Range; @@ -42,7 +44,7 @@ pub struct RemoteLayer { pub layer_metadata: LayerFileMetadata, - is_delta: bool, + pub is_delta: bool, is_incremental: bool, @@ -87,6 +89,45 @@ impl Layer for RemoteLayer { self.is_incremental } + fn get_occupied_ranges(&self, _ctx: &RequestContext) -> Result>> { + if let Some(holes) = &self.layer_metadata.holes() { + let mut occ = Vec::with_capacity(holes.len() + 1); + let key_range = self.get_key_range(); + let mut prev = key_range.start; + for hole in holes { + occ.push(prev..hole.0.start); + prev = hole.0.end; + } + occ.push(prev..key_range.end); + Ok(occ) + } else { + Ok(vec![self.get_key_range()]) + } + } + + fn get_holes(&self, _ctx: &RequestContext) -> Result>> { + Ok(self.layer_metadata.holes()) + } + + fn overlaps(&self, key_range: &Range, _ctx: &RequestContext) -> anyhow::Result { + if !range_overlaps(&self.key_range, key_range) { + Ok(false) + } else if let Some(holes) = &self.layer_metadata.holes() { + let start = match holes.binary_search_by_key(&key_range.start, |hole| hole.0.start) { + Ok(index) => index, + Err(index) => { + if index == 0 { + return Ok(true); + } + index - 1 + } + }; + Ok(holes[start].0.end < key_range.end) + } else { + Ok(true) + } + } + /// debugging function to print out the contents of the layer fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> { println!( @@ -237,6 +278,7 @@ impl RemoteLayer { &self, conf: &'static PageServerConf, file_size: u64, + ctx: &RequestContext, ) -> Arc { if self.is_delta { let fname = DeltaFileName { @@ -251,6 +293,7 @@ impl RemoteLayer { file_size, self.access_stats .clone_for_residence_change(LayerResidenceStatus::Resident), + self.get_holes(ctx).unwrap(), )) } else { let fname = ImageFileName { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 838df6d88431..651522525701 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -869,7 +869,11 @@ impl Timeline { /// Like [`evict_layer_batch`], but for just one layer. /// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`. - pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result> { + pub async fn evict_layer( + &self, + layer_file_name: &str, + ctx: &RequestContext, + ) -> anyhow::Result> { let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) }; let remote_client = self .remote_client @@ -878,7 +882,7 @@ impl Timeline { let cancel = CancellationToken::new(); let results = self - .evict_layer_batch(remote_client, &[local_layer], cancel) + .evict_layer_batch(remote_client, &[local_layer], cancel, ctx) .await?; assert_eq!(results.len(), 1); let result: Option> = results.into_iter().next().unwrap(); @@ -912,6 +916,7 @@ impl Timeline { remote_client: &Arc, layers_to_evict: &[Arc], cancel: CancellationToken, + ctx: &RequestContext, ) -> anyhow::Result>>> { // ensure that the layers have finished uploading // (don't hold the layer_removal_cs while we do it, we're not removing anything yet) @@ -933,7 +938,7 @@ impl Timeline { let res = if cancel.is_cancelled() { None } else { - Some(self.evict_layer_batch_impl(&layer_removal_guard, l, &mut batch_updates)) + Some(self.evict_layer_batch_impl(&layer_removal_guard, l, &mut batch_updates, ctx)) }; results.push(res); } @@ -952,9 +957,8 @@ impl Timeline { _layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, local_layer: &Arc, batch_updates: &mut BatchedUpdates<'_, dyn PersistentLayer>, + ctx: &RequestContext, ) -> anyhow::Result { - use super::layer_map::Replacement; - if local_layer.is_remote_layer() { return Ok(false); } @@ -963,6 +967,7 @@ impl Timeline { local_layer .file_size() .expect("Local layer should have a file size"), + local_layer.get_holes(ctx)?, ); let new_remote_layer = Arc::new(match local_layer.filename() { LayerFileName::Image(image_name) => RemoteLayer::new_img( @@ -985,37 +990,20 @@ impl Timeline { ), }); - let replaced = match batch_updates.replace_historic(local_layer, new_remote_layer)? { - Replacement::Replaced { .. } => { - let layer_size = local_layer.file_size(); - - if let Err(e) = local_layer.delete() { - error!("failed to remove layer file on evict after replacement: {e:#?}"); - } - - if let Some(layer_size) = layer_size { - self.metrics.resident_physical_size_gauge.sub(layer_size); - } + let replaced = batch_updates.replace_historic(local_layer, new_remote_layer, ctx)?; + if replaced { + let layer_size = local_layer.file_size(); - true - } - Replacement::NotFound => { - debug!(evicted=?local_layer, "layer was no longer in layer map"); - false - } - Replacement::RemovalBuffered => { - unreachable!("not doing anything else in this batch") + if let Err(e) = local_layer.delete() { + error!("failed to remove layer file on evict after replacement: {e:#?}"); } - Replacement::Unexpected(other) => { - error!( - local_layer.ptr=?Arc::as_ptr(local_layer), - other.ptr=?Arc::as_ptr(&other), - ?other, - "failed to replace"); - false - } - }; + if let Some(layer_size) = layer_size { + self.metrics.resident_physical_size_gauge.sub(layer_size); + } + } else { + debug!(evicted=?local_layer, "layer was no longer in layer map"); + } Ok(replaced) } } @@ -1240,7 +1228,11 @@ impl Timeline { /// Scan the timeline directory to populate the layer map. /// Returns all timeline-related files that were found and loaded. /// - pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { + pub(super) fn load_layer_map( + &self, + disk_consistent_lsn: Lsn, + ctx: &RequestContext, + ) -> anyhow::Result<()> { let mut layers = self.layers.write().unwrap(); let mut updates = layers.batch_update(); let mut num_layers = 0; @@ -1284,7 +1276,7 @@ impl Timeline { trace!("found layer {}", layer.path().display()); total_physical_size += file_size; - updates.insert_historic(Arc::new(layer)); + updates.insert_historic(Arc::new(layer), ctx)?; num_layers += 1; } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { // Create a DeltaLayer struct for each delta file. @@ -1312,11 +1304,12 @@ impl Timeline { &deltafilename, file_size, LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident), + None, ); trace!("found layer {}", layer.path().display()); total_physical_size += file_size; - updates.insert_historic(Arc::new(layer)); + updates.insert_historic(Arc::new(layer), ctx)?; num_layers += 1; } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { // ignore these @@ -1363,6 +1356,7 @@ impl Timeline { index_part: &IndexPart, local_layers: HashMap>, up_to_date_disk_consistent_lsn: Lsn, + ctx: &RequestContext, ) -> anyhow::Result>> { // Are we missing some files that are present in remote storage? // Create RemoteLayer instances for them. @@ -1410,7 +1404,7 @@ impl Timeline { anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}"); } else { self.metrics.resident_physical_size_gauge.sub(local_size); - updates.remove_historic(local_layer); + updates.remove_historic(local_layer, ctx)?; // fall-through to adding the remote layer } } else { @@ -1453,7 +1447,7 @@ impl Timeline { ); let remote_layer = Arc::new(remote_layer); - updates.insert_historic(remote_layer); + updates.insert_historic(remote_layer, ctx)?; } LayerFileName::Delta(deltafilename) => { // Create a RemoteLayer for the delta file. @@ -1477,7 +1471,7 @@ impl Timeline { LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted), ); let remote_layer = Arc::new(remote_layer); - updates.insert_historic(remote_layer); + updates.insert_historic(remote_layer, ctx)?; } } } @@ -1502,11 +1496,12 @@ impl Timeline { /// # TODO /// May be a bit cleaner to do things based on populated remote client, /// and then do things based on its upload_queue.latest_files. - #[instrument(skip(self, index_part, up_to_date_metadata))] + #[instrument(skip(self, index_part, up_to_date_metadata, ctx))] pub async fn reconcile_with_remote( &self, up_to_date_metadata: &TimelineMetadata, index_part: Option<&IndexPart>, + ctx: &RequestContext, ) -> anyhow::Result<()> { info!("starting"); let remote_client = self @@ -1531,7 +1526,7 @@ impl Timeline { index_part.timeline_layers.len() ); remote_client.init_upload_queue(index_part)?; - self.create_remote_layers(index_part, local_layers, disk_consistent_lsn) + self.create_remote_layers(index_part, local_layers, disk_consistent_lsn, ctx) .await? } None => { @@ -1552,8 +1547,10 @@ impl Timeline { .with_context(|| format!("failed to get file {layer_path:?} metadata"))? .len(); info!("scheduling {layer_path:?} for upload"); - remote_client - .schedule_layer_file_upload(layer_name, &LayerFileMetadata::new(layer_size))?; + remote_client.schedule_layer_file_upload( + layer_name, + &LayerFileMetadata::new(layer_size, layer.get_holes(ctx)?), + )?; } remote_client.schedule_index_upload_for_file_changes()?; @@ -1829,6 +1826,7 @@ impl Timeline { _layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, layer: Arc, updates: &mut BatchedUpdates<'_, dyn PersistentLayer>, + ctx: &RequestContext, ) -> anyhow::Result<()> { let layer_size = layer.file_size(); @@ -1842,7 +1840,7 @@ impl Timeline { // won't be needed for page reconstruction for this timeline, // and mark what we can't delete yet as deleted from the layer // map index without actually rebuilding the index. - updates.remove_historic(layer); + updates.remove_historic(layer, ctx)?; Ok(()) } @@ -2359,7 +2357,7 @@ impl Timeline { .await? } else { // normal case, write out a L0 delta layer file. - let (delta_path, metadata) = self.create_delta_layer(&frozen_layer)?; + let (delta_path, metadata) = self.create_delta_layer(&frozen_layer, ctx)?; HashMap::from([(delta_path, metadata)]) }; @@ -2465,6 +2463,7 @@ impl Timeline { fn create_delta_layer( &self, frozen_layer: &InMemoryLayer, + ctx: &RequestContext, ) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> { // Write it out let new_delta = frozen_layer.write_to_disk()?; @@ -2484,12 +2483,18 @@ impl Timeline { self.conf.timeline_path(&self.timeline_id, &self.tenant_id), ])?; + // + // This call force extraction of hole info from new delta layer so + // there is no need to perform this expensive operation under layer map write lock below + // + let holes = new_delta.get_holes(ctx)?; + // Add it to the layer map self.layers .write() .unwrap() .batch_update() - .insert_historic(Arc::new(new_delta)); + .insert_historic(Arc::new(new_delta), ctx)?; // update the timeline's physical size let sz = new_delta_path.metadata()?.len(); @@ -2499,7 +2504,7 @@ impl Timeline { self.metrics.num_persistent_files_created.inc_by(1); self.metrics.persistent_bytes_written.inc_by(sz); - Ok((new_delta_filename, LayerFileMetadata::new(sz))) + Ok((new_delta_filename, LayerFileMetadata::new(sz, holes))) } async fn repartition( @@ -2530,7 +2535,12 @@ impl Timeline { } // Is it time to create a new image layer for the given partition? - fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result { + fn time_for_new_image_layer( + &self, + partition: &KeySpace, + lsn: Lsn, + ctx: &RequestContext, + ) -> anyhow::Result { let layers = self.layers.read().unwrap(); for part_range in &partition.ranges { @@ -2556,7 +2566,7 @@ impl Timeline { if img_lsn < lsn { let threshold = self.get_image_creation_threshold(); let num_deltas = - layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold))?; + layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold), ctx)?; debug!( "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}", @@ -2582,7 +2592,7 @@ impl Timeline { let timer = self.metrics.create_images_time_histo.start_timer(); let mut image_layers: Vec = Vec::new(); for partition in partitioning.parts.iter() { - if force || self.time_for_new_image_layer(partition, lsn)? { + if force || self.time_for_new_image_layer(partition, lsn, ctx)? { let img_range = partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end; let mut image_layer_writer = ImageLayerWriter::new( @@ -2667,12 +2677,12 @@ impl Timeline { .metadata() .with_context(|| format!("reading metadata of layer file {}", path.file_name()))?; - layer_paths_to_upload.insert(path, LayerFileMetadata::new(metadata.len())); + layer_paths_to_upload.insert(path, LayerFileMetadata::new(metadata.len(), None)); self.metrics .resident_physical_size_gauge .add(metadata.len()); - updates.insert_historic(Arc::new(l)); + updates.insert_historic(Arc::new(l), ctx)?; } updates.flush(); drop(layers); @@ -2988,7 +2998,7 @@ impl Timeline { if let Some(remote_client) = &self.remote_client { remote_client.schedule_layer_file_upload( &l.filename(), - &LayerFileMetadata::new(metadata.len()), + &LayerFileMetadata::new(metadata.len(), l.get_holes(ctx)?), )?; } @@ -2997,9 +3007,12 @@ impl Timeline { .resident_physical_size_gauge .add(metadata.len()); - new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); + new_layer_paths.insert( + new_delta_path, + LayerFileMetadata::new(metadata.len(), l.get_holes(ctx)?), + ); let x: Arc = Arc::new(l); - updates.insert_historic(x); + updates.insert_historic(x, ctx)?; } // Now that we have reshuffled the data to set of new delta layers, we can @@ -3007,7 +3020,7 @@ impl Timeline { let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len()); for l in deltas_to_compact { layer_names_to_delete.push(l.filename()); - self.delete_historic_layer(layer_removal_cs, l, &mut updates)?; + self.delete_historic_layer(layer_removal_cs, l, &mut updates, ctx)?; } updates.flush(); drop(layers); @@ -3122,7 +3135,7 @@ impl Timeline { /// within a layer file. We can only remove the whole file if it's fully /// obsolete. /// - pub(super) async fn gc(&self) -> anyhow::Result { + pub(super) async fn gc(&self, ctx: &RequestContext) -> anyhow::Result { let timer = self.metrics.garbage_collect_histo.start_timer(); fail_point!("before-timeline-gc"); @@ -3152,6 +3165,7 @@ impl Timeline { pitr_cutoff, retain_lsns, new_gc_cutoff, + ctx, ) .instrument( info_span!("gc_timeline", timeline = %self.timeline_id, cutoff = %new_gc_cutoff), @@ -3171,6 +3185,7 @@ impl Timeline { pitr_cutoff: Lsn, retain_lsns: Vec, new_gc_cutoff: Lsn, + ctx: &RequestContext, ) -> anyhow::Result { let now = SystemTime::now(); let mut result: GcResult = GcResult::default(); @@ -3292,15 +3307,17 @@ impl Timeline { // If GC horizon is at 2500, we can remove layers A and B, but // we cannot remove C, even though it's older than 2500, because // the delta layer 2000-3000 depends on it. - if !layers - .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))? - { - debug!( - "keeping {} because it is the latest layer", - l.filename().file_name() - ); - result.layers_not_updated += 1; - continue 'outer; + for occupied_range in &l.get_occupied_ranges(ctx)? { + if !layers + .image_layer_exists(occupied_range, &(l.get_lsn_range().end..new_gc_cutoff))? + { + debug!( + "keeping {} because it is the latest layer", + l.filename().file_name() + ); + result.layers_not_updated += 1; + continue 'outer; + } } // We didn't find any reason to keep this file, so remove it. @@ -3325,7 +3342,7 @@ impl Timeline { { for doomed_layer in layers_to_remove { layer_names_to_delete.push(doomed_layer.filename()); - self.delete_historic_layer(layer_removal_cs, doomed_layer, &mut updates)?; // FIXME: schedule succeeded deletions before returning? + self.delete_historic_layer(layer_removal_cs, doomed_layer, &mut updates, ctx)?; // FIXME: schedule succeeded deletions before returning? result.layers_removed += 1; } } @@ -3479,6 +3496,10 @@ impl Timeline { false, async move { let remote_client = self_clone.remote_client.as_ref().unwrap(); + let ctx = RequestContext::new( + TaskKind::DownloadAllRemoteLayers, + DownloadBehavior::Download, + ); // Does retries + exponential back-off internally. // When this fails, don't layer further retry attempts here. @@ -3493,45 +3514,33 @@ impl Timeline { // Download complete. Replace the RemoteLayer with the corresponding // Delta- or ImageLayer in the layer map. - let new_layer = remote_layer.create_downloaded_layer(self_clone.conf, *size); + let new_layer = + remote_layer.create_downloaded_layer(self_clone.conf, *size, &ctx); + + // Update the metadata for given layer file. The remote index file + // might be missing some information for the file; this allows us + // to fill in the missing details. + if remote_layer.layer_metadata.file_size().is_none() + || (remote_layer.is_delta && remote_layer.layer_metadata.holes().is_none()) + { + let new_metadata = + LayerFileMetadata::new(*size, new_layer.get_holes(&ctx).unwrap()); + remote_client + .upgrade_layer_metadata( + &remote_layer.file_name, + &remote_layer.layer_metadata, + &new_metadata, + ) + .await; + } + let mut layers = self_clone.layers.write().unwrap(); let mut updates = layers.batch_update(); { - use crate::tenant::layer_map::Replacement; let l: Arc = remote_layer.clone(); - match updates.replace_historic(&l, new_layer) { - Ok(Replacement::Replaced { .. }) => { /* expected */ } - Ok(Replacement::NotFound) => { - // TODO: the downloaded file should probably be removed, otherwise - // it will be added to the layermap on next load? we should - // probably restart any get_reconstruct_data search as well. - // - // See: https://github.com/neondatabase/neon/issues/3533 - error!("replacing downloaded layer into layermap failed because layer was not found"); - } - Ok(Replacement::RemovalBuffered) => { - unreachable!("current implementation does not remove anything") - } - Ok(Replacement::Unexpected(other)) => { - // if the other layer would have the same pointer value as - // expected, it means they differ only on vtables. - // - // otherwise there's no known reason for this to happen as - // compacted layers should have different covering rectangle - // leading to produce Replacement::NotFound. - - error!( - expected.ptr = ?Arc::as_ptr(&l), - other.ptr = ?Arc::as_ptr(&other), - ?other, - "replacing downloaded layer into layermap failed because another layer was found instead of expected" - ); - } - Err(e) => { - // this is a precondition failure, the layer filename derived - // attributes didn't match up, which doesn't seem likely. - error!("replacing downloaded layer into layermap failed: {e:#?}") - } + if !updates.replace_historic(&l, new_layer, &ctx)? { + // See: https://github.com/neondatabase/neon/issues/3533 + error!("replacing downloaded layer into layermap failed because layer was not found"); } } updates.flush(); diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index b35252243edd..a0cd6996e83f 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1531,11 +1531,13 @@ def from_json(cls, d: Dict[str, Any]) -> LayerMapInfo: assert isinstance(json_in_memory_layers, List) for json_in_memory_layer in json_in_memory_layers: info.in_memory_layers.append(InMemoryLayerInfo.from_json(json_in_memory_layer)) + info.in_memory_layers.sort(key=lambda info: info.lsn_start) json_historic_layers = d["historic_layers"] assert isinstance(json_historic_layers, List) for json_historic_layer in json_historic_layers: info.historic_layers.append(HistoricLayerInfo.from_json(json_historic_layer)) + info.historic_layers.sort(key=lambda info: info.layer_file_name) return info diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py index 3551f27cad56..7e648ef03196 100644 --- a/test_runner/regress/test_ondemand_download.py +++ b/test_runner/regress/test_ondemand_download.py @@ -1,6 +1,7 @@ # It's possible to run any regular test with the local fs remote storage via # env ZENITH_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ...... +import time from pathlib import Path import pytest @@ -212,6 +213,9 @@ def get_resident_physical_size(): log.info(filled_size) assert filled_current_physical == filled_size, "we don't yet do layer eviction" + # Wait until generated image layers are uploaded to S3 + time.sleep(3) + env.pageserver.stop() # remove all the layer files diff --git a/test_runner/regress/test_tenants_with_remote_storage.py b/test_runner/regress/test_tenants_with_remote_storage.py index 6da6a4d4463c..769bc10280b8 100644 --- a/test_runner/regress/test_tenants_with_remote_storage.py +++ b/test_runner/regress/test_tenants_with_remote_storage.py @@ -280,6 +280,7 @@ def test_tenant_upgrades_index_json_from_v0( timeline_file.seek(0) json.dump(v0_index_part, timeline_file) + timeline_file.truncate(timeline_file.tell()) env.pageserver.start() pageserver_http = env.pageserver.http_client() diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index f210ac524b42..e9c23004da28 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit f210ac524b42d2d6f404f8505c64de36e977d17c +Subproject commit e9c23004da2891cdfe3c1f108b19ca7d93846643 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 33f976345490..39a65d110298 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 33f976345490351f951d72f81621c2263c186c9a +Subproject commit 39a65d11029806797bf9453bdc516a3a543c612b