Skip to content

Commit

Permalink
fix(bootstrap): use atomic write crate and remove locks
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin committed Dec 5, 2024
1 parent d8f3ac7 commit 2f90166
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 104 deletions.
34 changes: 23 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions ant-bootstrap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ local = []
[dependencies]
ant-logging = { path = "../ant-logging", version = "0.2.40" }
ant-protocol = { version = "0.17.15", path = "../ant-protocol" }
atomic-write-file = "0.2.2"
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.2.1", features = ["derive", "env"] }
dirs-next = "~2.0.0"
Expand All @@ -25,20 +26,16 @@ reqwest = { version = "0.12.2", default-features = false, features = [
] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tempfile = "3.8.1"
thiserror = "1.0"
tokio = { version = "1.0", features = ["time"] }
tracing = "0.1"
url = "2.4.0"
# fs2 fails to compile on wasm32 target
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
fs2 = "0.4.3"


[dev-dependencies]
wiremock = "0.5"
tokio = { version = "1.0", features = ["full", "test-util"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tempfile = "3.8.1"

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasmtimer = "0.2.0"
107 changes: 21 additions & 86 deletions ant-bootstrap/src/cache_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,16 @@ use crate::{
craft_valid_multiaddr, initial_peers::PeersArgs, multiaddr_get_peer_id, BootstrapAddr,
BootstrapAddresses, BootstrapCacheConfig, Error, Result,
};
#[cfg(not(target_arch = "wasm32"))]
use fs2::FileExt;
use libp2p::multiaddr::Protocol;
use libp2p::{Multiaddr, PeerId};
use atomic_write_file::AtomicWriteFile;
use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
use serde::{Deserialize, Serialize};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::Read;
use std::path::PathBuf;
use std::time::{Duration, SystemTime};
use tempfile::NamedTempFile;
use std::{
collections::{hash_map::Entry, HashMap},
fs::{self, OpenOptions},
io::{Read, Write},
path::PathBuf,
time::{Duration, SystemTime},
};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CacheData {
Expand Down Expand Up @@ -213,11 +211,6 @@ impl BootstrapCacheStore {
.open(&cfg.cache_file_path)
.inspect_err(|err| warn!("Failed to open cache file: {err}",))?;

// Acquire shared lock for reading
Self::acquire_shared_lock(&file).inspect_err(|err| {
warn!("Failed to acquire shared lock: {err}");
})?;

// Read the file contents
let mut contents = String::new();
file.read_to_string(&mut contents).inspect_err(|err| {
Expand Down Expand Up @@ -384,87 +377,29 @@ impl BootstrapCacheStore {
})
}

/// Acquire a shared lock on the cache file.
#[cfg(target_arch = "wasm32")]
fn acquire_shared_lock(_file: &File) -> Result<()> {
Ok(())
}

/// Acquire a shared lock on the cache file.
/// This is a no-op on WASM.
#[cfg(not(target_arch = "wasm32"))]
fn acquire_shared_lock(file: &File) -> Result<()> {
let file = file.try_clone()?;
file.try_lock_shared()?;

Ok(())
}

/// Acquire an exclusive lock on the cache file.
/// This is a no-op on WASM.
#[cfg(target_arch = "wasm32")]
async fn acquire_exclusive_lock(_file: &File) -> Result<()> {
Ok(())
}

/// Acquire an exclusive lock on the cache file.
#[cfg(not(target_arch = "wasm32"))]
async fn acquire_exclusive_lock(file: &File) -> Result<()> {
let mut backoff = Duration::from_millis(10);
let max_attempts = 5;
let mut attempts = 0;

loop {
match file.try_lock_exclusive() {
Ok(_) => return Ok(()),
Err(_) if attempts >= max_attempts => {
return Err(Error::LockError);
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
attempts += 1;
#[cfg(not(target_arch = "wasm32"))]
tokio::time::sleep(backoff).await;
#[cfg(target_arch = "wasm32")]
wasmtimer::tokio::sleep(backoff).await;
backoff *= 2;
}
Err(_) => return Err(Error::LockError),
}
}
}

async fn atomic_write(&self) -> Result<()> {
info!("Writing cache to disk: {:?}", self.cache_path);
debug!("Writing cache to disk: {:?}", self.cache_path);
// Create parent directory if it doesn't exist
if let Some(parent) = self.cache_path.parent() {
fs::create_dir_all(parent)?;
}

// Create a temporary file in the same directory as the cache file
let temp_dir = std::env::temp_dir();
let temp_file = NamedTempFile::new_in(&temp_dir)?;

// Write data to temporary file
serde_json::to_writer_pretty(&temp_file, &self.data)?;

// Open the target file with proper permissions
let file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&self.cache_path)?;
let mut file = AtomicWriteFile::options()
.open(&self.cache_path)
.inspect_err(|err| {
error!("Failed to open cache file using AtomicWriteFile: {err}");
})?;

// Acquire exclusive lock
Self::acquire_exclusive_lock(&file).await?;

// Perform atomic rename
temp_file.persist(&self.cache_path).inspect_err(|err| {
error!("Failed to persist file with err: {err:?}");
let data = serde_json::to_string_pretty(&self.data).inspect_err(|err| {
error!("Failed to serialize cache data: {err}");
})?;
writeln!(file, "{data}")?;
file.commit().inspect_err(|err| {
error!("Failed to commit atomic write: {err}");
})?;

info!("Cache written to disk: {:?}", self.cache_path);

// Lock will be automatically released when file is dropped
Ok(())
}
}
Expand Down
2 changes: 0 additions & 2 deletions ant-bootstrap/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ pub enum Error {
Json(#[from] serde_json::Error),
#[error("HTTP error: {0}")]
Http(#[from] reqwest::Error),
#[error("Persist error: {0}")]
Persist(#[from] tempfile::PersistError),
#[error("Lock error")]
LockError,
}
Expand Down

0 comments on commit 2f90166

Please sign in to comment.