Skip to content

Commit

Permalink
[ENH] Two-level manifest snapshot.
Browse files Browse the repository at this point in the history
  • Loading branch information
rescrv committed Nov 19, 2024
1 parent 2647392 commit eb7498c
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 198 deletions.
174 changes: 0 additions & 174 deletions rust/wal3/examples/wal3-manifest-manager-bench.rs

This file was deleted.

101 changes: 77 additions & 24 deletions rust/wal3/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,15 @@ pub struct ShardFragment {
}

impl ShardFragment {
pub const JSON_SIZE_ESTIMATE: usize = 256;

pub fn contains_position(&self, position: LogPosition) -> bool {
self.start <= position && position < self.limit
}

pub fn scrub(&self) -> Result<sst::Setsum, ScrubError> {
Ok(self.setsum)
}

pub fn json_size(&self) -> usize {
// TODO(rescrv): Possibly make this accurate. It's an upper bound right now.
256
}
}

//////////////////////////////////////////// PrevPointer ///////////////////////////////////////////
Expand Down Expand Up @@ -150,26 +147,58 @@ pub struct SnapPointer {
serialize_with = "super::serialize_setsum"
)]
pub setsum: sst::Setsum,
pub path_to_manifest: String,
pub path_to_snapshot: String,
pub depth: u8,
}

impl SnapPointer {
pub const JSON_SIZE_ESTIMATE: usize = 142;
}

///////////////////////////////////////////// Snapshot /////////////////////////////////////////////

#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)]
pub struct Snapshot {
pub path: String,
pub depth: u8,
#[serde(
deserialize_with = "super::deserialize_setsum",
serialize_with = "super::serialize_setsum"
)]
pub setsum: sst::Setsum,
pub writer: String,
pub snapshots: Vec<SnapPointer>,
pub fragments: Vec<ShardFragment>,
}

impl Snapshot {
pub fn scrub(&self) -> Result<sst::Setsum, ScrubError> {
if !self.fragments.is_empty() && !self.snapshots.is_empty() {
return Err(ScrubError::CorruptManifest {
manifest: self.path.to_string(),
what: format!(
"snapshot contains both fragments and snapshots in {}: fragments:{} snapshots:{}",
self.path,
self.fragments.len(),
self.snapshots.len(),
),
});
}
let mut acc = sst::Setsum::default();
for snapshot in self.snapshots.iter() {
acc += snapshot.setsum;
}
let depth = self.snapshots.iter().map(|s| s.depth).max().unwrap_or(0);
if depth + 1 != self.depth {
return Err(ScrubError::CorruptManifest{
manifest: self.path.to_string(),
what: format!(
"expected snapshot depth does not match observed contents in {}: expected:{} != observed:{}",
self.path,
self.depth,
depth + 1,
)});
}
for frag in self.fragments.iter() {
acc += frag.setsum;
}
Expand Down Expand Up @@ -261,13 +290,15 @@ impl Snapshot {
arrrg_derive::CommandLine,
)]
pub struct SnapshotOptions {
pub rollover_threshold: usize,
pub snapshot_rollover_threshold: usize,
pub fragment_rollover_threshold: usize,
}

impl Default for SnapshotOptions {
fn default() -> Self {
SnapshotOptions {
rollover_threshold: 1 << 18,
snapshot_rollover_threshold: (1 << 18) / SnapPointer::JSON_SIZE_ESTIMATE,
fragment_rollover_threshold: (1 << 19) / ShardFragment::JSON_SIZE_ESTIMATE,
}
}
}
Expand All @@ -294,25 +325,45 @@ impl Manifest {
pub fn generate_snapshot(&self, snapshot_options: SnapshotOptions) -> Option<Snapshot> {
// TODO(rescrv): A real, random string.
let writer = "TODO".to_string();
let mut fragments = Vec::with_capacity(self.fragments.len());
let mut size = 0;
for fragment in self.fragments.iter() {
size += fragment.json_size();
fragments.push(fragment.clone());
if size >= snapshot_options.rollover_threshold {
break;
}
}
if !fragments.is_empty() && size >= snapshot_options.rollover_threshold {
let setsum = fragments
.iter()
.map(|f| f.setsum)
.fold(sst::Setsum::default(), |acc, x| acc + x);
let can_snapshot_snapshots = self.snapshots.iter().filter(|s| s.depth < 2).count()
>= snapshot_options.snapshot_rollover_threshold;
let can_snapshot_fragments =
self.fragments.len() >= snapshot_options.fragment_rollover_threshold;
if can_snapshot_snapshots || can_snapshot_fragments {
// NOTE(rescrv): We _either_ compact a snapshot of snapshots or a snapshot of log
// fragments. We don't do both as interior snapshot nodes only refer to objects of the
// same type. Manifests are the only objects to refer to both fragments and shards.
let mut snapshots = vec![];
let mut fragments = vec![];
let mut setsum = sst::Setsum::default();
let depth = if can_snapshot_snapshots {
for snapshot in self.snapshots.iter() {
if snapshot.depth < 2
&& snapshots.len() < snapshot_options.snapshot_rollover_threshold
{
setsum += snapshot.setsum;
snapshots.push(snapshot.clone());
}
}
2
} else if can_snapshot_fragments {
for fragment in self.fragments.iter() {
if fragments.len() < snapshot_options.fragment_rollover_threshold {
setsum += fragment.setsum;
fragments.push(fragment.clone());
}
}
1
} else {
unreachable!();
};
let path = snapshot_path(setsum);
Some(Snapshot {
path,
depth,
setsum,
writer,
snapshots,
fragments,
})
} else {
Expand All @@ -338,11 +389,13 @@ impl Manifest {
)));
}
}
self.snapshots
.retain(|s| !snapshot.snapshots.iter().any(|t| t.setsum == s.setsum));
self.fragments = self.fragments.split_off(snapshot.fragments.len());
println!("applied snapshot {:?}", self.fragments);
self.snapshots.push(SnapPointer {
setsum: snapshot.setsum,
path_to_manifest: snapshot.path.clone(),
path_to_snapshot: snapshot.path.clone(),
depth: snapshot.depth,
});
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions rust/wal3/src/manifest_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl Staging {
if self.snapshots_staged.contains(&s.setsum) {
self.snapshots_staged.retain(|ss| ss != &s.setsum);
if let Err(err) = new_manifest.apply_snapshot(s) {
println!("apply snapshot failed: {:?}", err);
APPLY_SNAPSHOT_FAILED.click();
} else {
snapshot = None;
Expand Down

0 comments on commit eb7498c

Please sign in to comment.