From eb7498c5f2d0ef0a32dad521df63eb6849c42509 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Thu, 31 Oct 2024 16:29:09 -0700 Subject: [PATCH] [ENH] Two-level manifest snapshot. --- .../examples/wal3-manifest-manager-bench.rs | 174 ------------------ rust/wal3/src/manifest.rs | 101 +++++++--- rust/wal3/src/manifest_manager.rs | 1 + 3 files changed, 78 insertions(+), 198 deletions(-) delete mode 100644 rust/wal3/examples/wal3-manifest-manager-bench.rs diff --git a/rust/wal3/examples/wal3-manifest-manager-bench.rs b/rust/wal3/examples/wal3-manifest-manager-bench.rs deleted file mode 100644 index 0b5553c6fc32..000000000000 --- a/rust/wal3/examples/wal3-manifest-manager-bench.rs +++ /dev/null @@ -1,174 +0,0 @@ -use std::sync::Arc; -use std::time::{Duration, Instant, SystemTime}; - -use arrrg::CommandLine; -use biometrics::{Collector, Histogram}; -use guacamole::combinators::*; -use guacamole::Guacamole; -//use object_store::aws::{AmazonS3, AmazonS3Builder, S3ConditionalPut}; -use utf8path::Path; - -use wal3::{ - LatencyControlledObjectStore, LogWriterOptions, Manifest, ManifestManager, ShardFragment, - ShardID, ShardSeqNo, SimulationOptions, -}; - -//////////////////////////////////////////// biometrics //////////////////////////////////////////// - -static APPLY_LATENCY_IMPL: sig_fig_histogram::LockFreeHistogram<1000> = - sig_fig_histogram::LockFreeHistogram::new(2); -static APPLY_LATENCY: Histogram = - Histogram::new("wal3__benchmark__apply_histogram", &APPLY_LATENCY_IMPL); - -fn register_biometrics(collector: &Collector) { - collector.register_histogram(&APPLY_LATENCY); -} - -///////////////////////////////////////////// benchmark //////////////////////////////////////////// - -#[derive(Clone, Eq, PartialEq, arrrg_derive::CommandLine)] -pub struct Options { - #[arrrg(optional, "Path to the object store.")] - pub path: String, - #[arrrg(optional, "Number of seconds for which to run the benchmark.")] - pub runtime: usize, - #[arrrg(optional, "Target throughput in fragments per second per shard.")] - pub target_throughput: usize, - #[arrrg(optional, "Maximum number of tokio tasks to spawn.")] - pub max_tokio_tasks: usize, - #[arrrg(nested)] - pub object_store: SimulationOptions, - #[arrrg(nested)] - pub log: LogWriterOptions, -} - -impl Default for Options { - fn default() -> Self { - Options { - path: "wal3.data".to_string(), - runtime: 60, - target_throughput: 3_500, - max_tokio_tasks: 100_000, - object_store: SimulationOptions::default(), - log: LogWriterOptions::default(), - } - } -} - -#[tokio::main(flavor = "multi_thread")] -async fn main() { - let (options, free) = Options::from_command_line_relaxed("USAGE: wal3 [OPTIONS]"); - if !free.is_empty() { - eprintln!("command takes no positional arguments"); - std::process::exit(1); - } - // setup biometrics - let collector = Collector::new(); - register_biometrics(&collector); - wal3::register_biometrics(&collector); - let bio_prom_options = biometrics_prometheus::Options { - segment_size: 1 << 24, - flush_interval: Duration::from_secs(30), - prefix: Path::from("wal3.").into_owned(), - }; - let mut emitter = biometrics_prometheus::Emitter::new(bio_prom_options); - std::thread::spawn(move || loop { - std::thread::sleep(Duration::from_secs(1)); - collector - .emit( - &mut emitter, - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis() as u64, - ) - .unwrap(); - }); - - // setup the manifest manager - /* - let object_store: AmazonS3 = AmazonS3Builder::from_env() - .with_bucket_name("chroma-robert-wal3-test-bucket") - .with_region("us-east-2") - .with_conditional_put(S3ConditionalPut::ETagMatch) - .build() - .unwrap(); - */ - let object_store = - object_store::local::LocalFileSystem::new_with_prefix(&options.path).unwrap(); - let object_store = LatencyControlledObjectStore::new( - options.object_store.clone(), - object_store, - Guacamole::new(0), - ); - let object_store = Arc::new(object_store); - Manifest::initialize(&options.log, &*object_store) - .await - .expect("manifest to initialize"); - let manifest = Manifest::load(&*object_store, options.log.load_alpha) - .await - .expect("manifest to load") - .expect("manifest to exist"); - let manifest_manager = Arc::new( - ManifestManager::new( - options.log.throttle_manifest, - options.log.snapshot_manifest, - manifest, - Arc::clone(&object_store), - ) - .await, - ); - - // Spawn the threads. - let mut threads = vec![]; - for idx in 1..=options.log.shards { - let options = options.clone(); - let manifest_manager = Arc::clone(&manifest_manager); - threads.push(tokio::task::spawn(async move { - let mut guac = Guacamole::new(unique_set_index(0xc0ffee)(idx) as u64); - let start = Instant::now(); - let mut next = Duration::ZERO; - let shard_id = ShardID(idx); - let mut shard_seq_no = ShardSeqNo(1); - loop { - let gap = interarrival_duration(options.target_throughput as f64)(&mut guac); - next += gap; - let elapsed = start.elapsed(); - if elapsed > Duration::from_secs(options.runtime as u64) { - break; - } else if elapsed < next { - tokio::time::sleep(next - elapsed).await; - } - let start = Instant::now(); - let (log_position, delta_seq_no) = manifest_manager - .assign_timestamp(1) - .expect("log should never fill"); - let delta = ShardFragment { - path: "doesn't matter".to_string(), - shard_id, - seq_no: shard_seq_no, - start: log_position, - limit: log_position + 1, - setsum: sst::Setsum::default(), - }; - manifest_manager - .apply_delta(delta, delta_seq_no) - .await - .expect("apply delta to succeed"); - let elapsed = start.elapsed(); - APPLY_LATENCY.observe(elapsed.as_micros() as f64); - let handle = tokio::runtime::Handle::current(); - let tasks_alive = handle.metrics().num_alive_tasks(); - if tasks_alive > options.max_tokio_tasks { - eprintln!("max tokio tasks exceeded: {tasks_alive}"); - break; - } - shard_seq_no += 1; - } - })); - } - // Wait for the threads to finish. - for thread in threads { - let _ = thread.await; - } -} diff --git a/rust/wal3/src/manifest.rs b/rust/wal3/src/manifest.rs index c5d6d0d10897..ed149f8faf5f 100644 --- a/rust/wal3/src/manifest.rs +++ b/rust/wal3/src/manifest.rs @@ -93,6 +93,8 @@ pub struct ShardFragment { } impl ShardFragment { + pub const JSON_SIZE_ESTIMATE: usize = 256; + pub fn contains_position(&self, position: LogPosition) -> bool { self.start <= position && position < self.limit } @@ -100,11 +102,6 @@ impl ShardFragment { pub fn scrub(&self) -> Result { Ok(self.setsum) } - - pub fn json_size(&self) -> usize { - // TODO(rescrv): Possibly make this accurate. It's an upper bound right now. - 256 - } } //////////////////////////////////////////// PrevPointer /////////////////////////////////////////// @@ -150,7 +147,12 @@ pub struct SnapPointer { serialize_with = "super::serialize_setsum" )] pub setsum: sst::Setsum, - pub path_to_manifest: String, + pub path_to_snapshot: String, + pub depth: u8, +} + +impl SnapPointer { + pub const JSON_SIZE_ESTIMATE: usize = 142; } ///////////////////////////////////////////// Snapshot ///////////////////////////////////////////// @@ -158,18 +160,45 @@ pub struct SnapPointer { #[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] pub struct Snapshot { pub path: String, + pub depth: u8, #[serde( deserialize_with = "super::deserialize_setsum", serialize_with = "super::serialize_setsum" )] pub setsum: sst::Setsum, pub writer: String, + pub snapshots: Vec, pub fragments: Vec, } impl Snapshot { pub fn scrub(&self) -> Result { + if !self.fragments.is_empty() && !self.snapshots.is_empty() { + return Err(ScrubError::CorruptManifest { + manifest: self.path.to_string(), + what: format!( + "snapshot contains both fragments and snapshots in {}: fragments:{} snapshots:{}", + self.path, + self.fragments.len(), + self.snapshots.len(), + ), + }); + } let mut acc = sst::Setsum::default(); + for snapshot in self.snapshots.iter() { + acc += snapshot.setsum; + } + let depth = self.snapshots.iter().map(|s| s.depth).max().unwrap_or(0); + if depth + 1 != self.depth { + return Err(ScrubError::CorruptManifest{ + manifest: self.path.to_string(), + what: format!( + "expected snapshot depth does not match observed contents in {}: expected:{} != observed:{}", + self.path, + self.depth, + depth + 1, + )}); + } for frag in self.fragments.iter() { acc += frag.setsum; } @@ -261,13 +290,15 @@ impl Snapshot { arrrg_derive::CommandLine, )] pub struct SnapshotOptions { - pub rollover_threshold: usize, + pub snapshot_rollover_threshold: usize, + pub fragment_rollover_threshold: usize, } impl Default for SnapshotOptions { fn default() -> Self { SnapshotOptions { - rollover_threshold: 1 << 18, + snapshot_rollover_threshold: (1 << 18) / SnapPointer::JSON_SIZE_ESTIMATE, + fragment_rollover_threshold: (1 << 19) / ShardFragment::JSON_SIZE_ESTIMATE, } } } @@ -294,25 +325,45 @@ impl Manifest { pub fn generate_snapshot(&self, snapshot_options: SnapshotOptions) -> Option { // TODO(rescrv): A real, random string. let writer = "TODO".to_string(); - let mut fragments = Vec::with_capacity(self.fragments.len()); - let mut size = 0; - for fragment in self.fragments.iter() { - size += fragment.json_size(); - fragments.push(fragment.clone()); - if size >= snapshot_options.rollover_threshold { - break; - } - } - if !fragments.is_empty() && size >= snapshot_options.rollover_threshold { - let setsum = fragments - .iter() - .map(|f| f.setsum) - .fold(sst::Setsum::default(), |acc, x| acc + x); + let can_snapshot_snapshots = self.snapshots.iter().filter(|s| s.depth < 2).count() + >= snapshot_options.snapshot_rollover_threshold; + let can_snapshot_fragments = + self.fragments.len() >= snapshot_options.fragment_rollover_threshold; + if can_snapshot_snapshots || can_snapshot_fragments { + // NOTE(rescrv): We _either_ compact a snapshot of snapshots or a snapshot of log + // fragments. We don't do both as interior snapshot nodes only refer to objects of the + // same type. Manifests are the only objects to refer to both fragments and shards. + let mut snapshots = vec![]; + let mut fragments = vec![]; + let mut setsum = sst::Setsum::default(); + let depth = if can_snapshot_snapshots { + for snapshot in self.snapshots.iter() { + if snapshot.depth < 2 + && snapshots.len() < snapshot_options.snapshot_rollover_threshold + { + setsum += snapshot.setsum; + snapshots.push(snapshot.clone()); + } + } + 2 + } else if can_snapshot_fragments { + for fragment in self.fragments.iter() { + if fragments.len() < snapshot_options.fragment_rollover_threshold { + setsum += fragment.setsum; + fragments.push(fragment.clone()); + } + } + 1 + } else { + unreachable!(); + }; let path = snapshot_path(setsum); Some(Snapshot { path, + depth, setsum, writer, + snapshots, fragments, }) } else { @@ -338,11 +389,13 @@ impl Manifest { ))); } } + self.snapshots + .retain(|s| !snapshot.snapshots.iter().any(|t| t.setsum == s.setsum)); self.fragments = self.fragments.split_off(snapshot.fragments.len()); - println!("applied snapshot {:?}", self.fragments); self.snapshots.push(SnapPointer { setsum: snapshot.setsum, - path_to_manifest: snapshot.path.clone(), + path_to_snapshot: snapshot.path.clone(), + depth: snapshot.depth, }); Ok(()) } diff --git a/rust/wal3/src/manifest_manager.rs b/rust/wal3/src/manifest_manager.rs index 10361dc9429c..96560c36998a 100644 --- a/rust/wal3/src/manifest_manager.rs +++ b/rust/wal3/src/manifest_manager.rs @@ -88,6 +88,7 @@ impl Staging { if self.snapshots_staged.contains(&s.setsum) { self.snapshots_staged.retain(|ss| ss != &s.setsum); if let Err(err) = new_manifest.apply_snapshot(s) { + println!("apply snapshot failed: {:?}", err); APPLY_SNAPSHOT_FAILED.click(); } else { snapshot = None;