Skip to content

Commit

Permalink
Document & correctly handle local snapshot directory deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Dec 6, 2024
1 parent c56d2b1 commit 69b4a9a
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 15 deletions.
45 changes: 33 additions & 12 deletions crates/worker/src/partition/snapshots/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ impl SnapshotRepository {
)
}

/// Discover and download the latest snapshot available. Dropping the returned
/// `LocalPartitionSnapshot` will delete the local snapshot data files.
/// Discover and download the latest snapshot available. It is the caller's responsibility
/// to delete the snapshot directory when it is no longer needed.
#[instrument(
level = "debug",
skip_all,
Expand Down Expand Up @@ -397,10 +397,12 @@ impl SnapshotRepository {
}

if snapshot_metadata.cluster_name != self.cluster_name {
panic!("Snapshot does not match the cluster name of latest snapshot at destination in snapshot id {}! Expected: cluster name=\"{}\", found: \"{}\"",
// todo(pavel): revisit whether we shouldn't just panic at this point - this is a bad sign!
warn!("Snapshot does not match the cluster name of latest snapshot at destination in snapshot id {}! Expected: cluster name=\"{}\", found: \"{}\"",
snapshot_metadata.snapshot_id,
self.cluster_name,
snapshot_metadata.cluster_name);
return Ok(None); // perhaps this needs to be a configuration error
}

// The snapshot ingest directory should be on the same filesystem as the partition store
Expand Down Expand Up @@ -747,12 +749,14 @@ mod tests {
let snapshot_source = TempDir::new()?;
let source_dir = snapshot_source.path().to_path_buf();

let mut data = tokio::fs::File::create(source_dir.join("data.sst")).await?;
data.write_all(b"snapshot-data").await?;
let data = b"snapshot-data";
let mut data_file = tokio::fs::File::create(source_dir.join("data.sst")).await?;
data_file.write_all(data).await?;

let snapshot = mock_snapshot_metadata(
"/data.sst".to_owned(),
source_dir.to_string_lossy().to_string(),
data.len(),
);

let snapshots_destination: TempDir = TempDir::new()?;
Expand Down Expand Up @@ -827,12 +831,14 @@ mod tests {
.await;
assert!(matches!(latest, Err(object_store::Error::NotFound { .. })));

let mut data = tokio::fs::File::create(source_dir.join("data.sst")).await?;
data.write_all(b"snapshot-data").await?;
let data = b"snapshot-data";
let mut data_file = tokio::fs::File::create(source_dir.join("data.sst")).await?;
data_file.write_all(data).await?;

let mut snapshot1 = mock_snapshot_metadata(
"/data.sst".to_owned(),
source_dir.to_string_lossy().to_string(),
data.len(),
);
snapshot1.min_applied_lsn = Lsn::new(
SystemTime::now()
Expand Down Expand Up @@ -888,12 +894,14 @@ mod tests {
let snapshot_source = TempDir::new()?;
let source_dir = snapshot_source.path().to_path_buf();

let mut data = tokio::fs::File::create(source_dir.join("data.sst")).await?;
data.write_all(b"snapshot-data").await?;
let data = b"snapshot-data";
let mut data_file = tokio::fs::File::create(source_dir.join("data.sst")).await?;
data_file.write_all(data).await?;

let mut snapshot2 = mock_snapshot_metadata(
"/data.sst".to_owned(),
source_dir.to_string_lossy().to_string(),
data.len(),
);
snapshot2.min_applied_lsn = snapshot1.min_applied_lsn.next();

Expand All @@ -914,13 +922,26 @@ mod tests {
latest
);

let latest = repository.get_latest(PartitionId::MIN).await?.unwrap();
assert_eq!(latest.min_applied_lsn, snapshot2.min_applied_lsn);
let local_path = latest.base_dir.as_path().to_string_lossy().to_string();
drop(latest);

let local_dir_exists = tokio::fs::try_exists(&local_path).await?;
assert!(local_dir_exists);
tokio::fs::remove_dir_all(&local_path).await?;

Ok(())
}

fn mock_snapshot_metadata(file_name: String, directory: String) -> PartitionSnapshotMetadata {
fn mock_snapshot_metadata(
file_name: String,
directory: String,
size: usize,
) -> PartitionSnapshotMetadata {
PartitionSnapshotMetadata {
version: SnapshotFormatVersion::V1,
cluster_name: "test-cluster".to_string(),
cluster_name: "cluster".to_string(),
node_name: "node".to_string(),
partition_id: PartitionId::MIN,
created_at: humantime::Timestamp::from(SystemTime::now()),
Expand All @@ -933,7 +954,7 @@ mod tests {
column_family_name: "data-0".to_owned(),
name: file_name,
directory,
size: 0,
size,
level: 0,
start_key: Some(vec![0]),
end_key: Some(vec![0xff, 0xff]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use std::ops::RangeInclusive;

use tokio::sync::{mpsc, watch};
use tracing::{debug, info, instrument};
use tracing::{debug, info, instrument, warn};

use restate_bifrost::Bifrost;
use restate_core::{Metadata, RuntimeTaskHandle, TaskCenter, TaskKind};
Expand Down Expand Up @@ -149,14 +149,38 @@ impl SpawnPartitionProcessorTask {
"Found snapshot to bootstrap partition, restoring it",
);

partition_store_manager
let snapshot_path = snapshot.base_dir.clone();
match partition_store_manager
.open_partition_store_from_snapshot(
partition_id,
key_range.clone(),
snapshot,
&options.storage.rocksdb,
)
.await?
.await {
Ok(partition_store) => {
let res = tokio::fs::remove_dir_all(&snapshot_path).await;
if let Err(e) = res {
// This is not critical; since we move the SST files into RocksDB on import, at
// worst the snapshot metadata file will be retained.
warn!(
partition_id = %partition_id,
?snapshot_path,
"Failed to remove local snapshot directory, continuing with startup: {:?}",
e
);
}
partition_store
}
Err(e) => {
warn!(
partition_id = %partition_id,
?snapshot_path,
"Failed to import snapshot, local copy retained"
);
return Err(anyhow::anyhow!(e));
}
}
} else {
info!(
partition_id = %partition_id,
Expand Down

0 comments on commit 69b4a9a

Please sign in to comment.