Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Asynchronous snapshot upload #574

Merged
merged 8 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions bottomless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,12 @@ pub extern "C" fn xCheckpoint(
return ffi::SQLITE_IOERR_WRITE;
}

ctx.replicator.new_generation();
let prev = ctx.replicator.new_generation();
tracing::debug!("Snapshotting after checkpoint");
let result = block_on!(ctx.runtime, ctx.replicator.snapshot_main_db_file());
let result = block_on!(
ctx.runtime,
ctx.replicator.snapshot_main_db_file(Some(prev))
);
if let Err(e) = result {
tracing::error!(
"Failed to snapshot the main db file during checkpoint: {}",
Expand Down Expand Up @@ -418,9 +421,18 @@ async fn try_restore(replicator: &mut replicator::Replicator) -> i32 {
Ok(replicator::RestoreAction::None) => (),
Ok(replicator::RestoreAction::SnapshotMainDbFile) => {
replicator.new_generation();
if let Err(e) = replicator.snapshot_main_db_file().await {
tracing::error!("Failed to snapshot the main db file: {}", e);
return ffi::SQLITE_CANTOPEN;
match replicator.snapshot_main_db_file(None).await {
Ok(Some(h)) => {
if let Err(e) = h.await {
tracing::error!("Failed to join snapshot main db file task: {}", e);
return ffi::SQLITE_CANTOPEN;
}
}
Ok(None) => {}
Err(e) => {
tracing::error!("Failed to snapshot the main db file: {}", e);
return ffi::SQLITE_CANTOPEN;
}
}
// Restoration process only leaves the local WAL file if it was
// detected to be newer than its remote counterpart.
Expand Down
190 changes: 124 additions & 66 deletions bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::uuid_utils::decode_unix_timestamp;
use crate::wal::WalFileReader;
use anyhow::anyhow;
use arc_swap::ArcSwap;
use async_compression::tokio::write::GzipEncoder;
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
use aws_sdk_s3::operation::get_object::GetObjectError;
Expand All @@ -20,8 +21,10 @@ use std::path::Path;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio::sync::watch::{channel, Receiver, Sender};
use tokio::task::JoinHandle;
use tokio::task::JoinSet;
use tokio::time::{timeout_at, Instant};
use uuid::{NoContext, Uuid};
Expand All @@ -41,6 +44,8 @@ pub struct Replicator {
/// Always: [last_committed_frame_no] <= [last_sent_frame_no].
last_committed_frame_no: Receiver<Result<u32>>,
flush_trigger: Sender<()>,
snapshot_waiter: Receiver<Result<Option<Arc<Uuid>>>>,
snapshot_notifier: Arc<Sender<Result<Option<Arc<Uuid>>>>>,

pub page_size: usize,
restore_transaction_page_swap_after: u32,
Expand Down Expand Up @@ -217,10 +222,6 @@ impl Replicator {
}

pub async fn with_options<S: Into<String>>(db_path: S, options: Options) -> Result<Self> {
tracing::trace!(
"Starting bottomless replicator with options: {:#?}",
options
);
let config = options.client_config().await;
let client = Client::from_conf(config);
let bucket = options.bucket_name.clone();
Expand Down Expand Up @@ -345,6 +346,7 @@ impl Replicator {
}
})
};
let (snapshot_notifier, snapshot_waiter) = channel(Ok(None));
Ok(Self {
client,
bucket,
Expand All @@ -358,6 +360,8 @@ impl Replicator {
verify_crc: options.verify_crc,
db_path,
db_name,
snapshot_waiter,
snapshot_notifier: Arc::new(snapshot_notifier),
restore_transaction_page_swap_after: options.restore_transaction_page_swap_after,
restore_transaction_cache_fpath: options.restore_transaction_cache_fpath.into(),
use_compression: options.use_compression,
Expand All @@ -379,6 +383,21 @@ impl Replicator {
self.last_sent_frame_no.load(Ordering::Acquire)
}

pub async fn wait_until_snapshotted(&mut self, generation: Arc<Uuid>) -> Result<()> {
let res = self
.snapshot_waiter
.wait_for(|result| match result {
Ok(Some(gen)) => *gen == generation,
Ok(None) => false,
Err(_) => true,
})
.await?;
match res.deref() {
Ok(_) => Ok(()),
Err(e) => Err(anyhow!("Failed snapshot generation {}: {}", generation, e)),
}
}

/// Waits until the commit for a given frame_no or higher was given.
pub async fn wait_until_committed(&mut self, frame_no: u32) -> Result<u32> {
let res = self
Expand Down Expand Up @@ -473,20 +492,21 @@ impl Replicator {
}

// Starts a new generation for this replicator instance
pub fn new_generation(&mut self) {
tracing::debug!("New generation started: {}", self.generation);
self.set_generation(Self::generate_generation());
pub fn new_generation(&mut self) -> Arc<Uuid> {
tracing::debug!("Starting new generation: {}", self.generation);
self.set_generation(Self::generate_generation())
}

// Sets a generation for this replicator instance. This function
// should be called if a generation number from S3-compatible storage
// is reused in this session.
pub fn set_generation(&mut self, generation: Uuid) {
self.generation.swap(Arc::new(generation));
pub fn set_generation(&mut self, generation: Uuid) -> Arc<Uuid> {
let prev = self.generation.swap(Arc::new(generation));
self.commits_in_current_generation
.store(0, Ordering::Release);
self.reset_frames(0);
tracing::debug!("Generation set to {}", self.generation);
prev
}

// Returns the current last valid frame in the replicated log
Expand Down Expand Up @@ -552,23 +572,29 @@ impl Replicator {

// Returns the compressed database file path and its change counter, extracted
// from the header of page1 at offset 24..27 (as per SQLite documentation).
pub async fn compress_main_db_file(&self) -> Result<(&'static str, [u8; 4])> {
let compressed_db = "db.gz";
let mut reader = tokio::fs::File::open(&self.db_path).await?;
let mut writer = async_compression::tokio::write::GzipEncoder::new(
tokio::fs::File::create(compressed_db).await?,
);
let size = tokio::io::copy(&mut reader, &mut writer).await?;
tracing::trace!(
"Compressed database file ({} bytes) into {}",
size,
compressed_db
);
writer.shutdown().await?;
let change_counter = Self::read_change_counter(&mut reader).await?;
Ok((compressed_db, change_counter))
pub async fn maybe_compress_main_db_file(
mut reader: File,
compression: CompressionKind,
) -> Result<ByteStream> {
reader.seek(SeekFrom::Start(0)).await?;
match compression {
CompressionKind::None => Ok(ByteStream::read_from().file(reader).build().await?),
CompressionKind::Gzip => {
let compressed_file = OpenOptions::new()
.create(true)
.write(true)
.read(true)
.truncate(true)
.open("db.gz")
.await?;
let mut writer = GzipEncoder::new(compressed_file);
let size = tokio::io::copy(&mut reader, &mut writer).await?;
tracing::trace!("Compressed database file ({} bytes) into db.gz", size);
writer.shutdown().await?;
Ok(ByteStream::from_path("db.gz").await?)
}
}
}

// Replicates local WAL pages to S3, if local WAL is present.
// This function is called under the assumption that if local WAL
// file is present, it was already detected to be newer than its
Expand Down Expand Up @@ -616,59 +642,83 @@ impl Replicator {
// Sends the main database file to S3 - if -wal file is present, it's replicated
// too - it means that the local file was detected to be newer than its remote
// counterpart.
// returns whether the main db file was recovered.
pub async fn snapshot_main_db_file(&mut self) -> Result<bool> {
pub async fn snapshot_main_db_file(
&mut self,
prev_generation: Option<Arc<Uuid>>,
) -> Result<Option<JoinHandle<()>>> {
if !self.main_db_exists_and_not_empty().await {
tracing::debug!("Not snapshotting, the main db file does not exist or is empty");
return Ok(false);
let _ = self.snapshot_notifier.send(Ok(prev_generation));
return Ok(None);
}
tracing::debug!("Snapshotting {}", self.db_path);
let start = Instant::now();
let change_counter = match self.use_compression {
CompressionKind::None => {
self.client
.put_object()
.bucket(&self.bucket)
.key(format!("{}-{}/db.db", self.db_name, self.generation))
.body(ByteStream::from_path(&self.db_path).await?)
.send()
.await?;
let mut reader = tokio::fs::File::open(&self.db_path).await?;
Self::read_change_counter(&mut reader).await?
}
CompressionKind::Gzip => {
// TODO: find a way to compress ByteStream on the fly instead of creating
// an intermediary file.
let (compressed_db_path, change_counter) = self.compress_main_db_file().await?;
let key = format!("{}-{}/db.gz", self.db_name, self.generation);
self.client
.put_object()
.bucket(&self.bucket)
.key(key)
.body(ByteStream::from_path(compressed_db_path).await?)
.send()
.await?;
let _ = tokio::fs::remove_file(compressed_db_path).await;
change_counter
}
};
let start_ts = Instant::now();
if let Some(prev) = prev_generation {
tracing::debug!("waiting for previous generation {} to complete", prev);
self.wait_until_snapshotted(prev).await?;
}

let client = self.client.clone();
let mut db_file = File::open(&self.db_path).await?;
let change_counter = Self::read_change_counter(&mut db_file).await?;
let gen = self.generation.load_full();
let snapshot_req = client.put_object().bucket(self.bucket.clone()).key(format!(
"{}-{}/db.{}",
self.db_name, gen, self.use_compression
));

/* FIXME: we can't rely on the change counter in WAL mode:
** "In WAL mode, changes to the database are detected using the wal-index and
** so the change counter is not needed. Hence, the change counter might not be
** incremented on each transaction in WAL mode."
** Instead, we need to consult WAL checksums.
*/
let change_counter_key = format!("{}-{}/.changecounter", self.db_name, self.generation);
self.client
let change_counter_key = format!("{}-{}/.changecounter", self.db_name, gen);
let change_counter_req = self
.client
.put_object()
.bucket(&self.bucket)
.key(change_counter_key)
.body(ByteStream::from(Bytes::copy_from_slice(&change_counter)))
.send()
.await?;
tracing::debug!("Main db snapshot complete in {:?}", Instant::now() - start);
Ok(true)
.body(ByteStream::from(Bytes::copy_from_slice(
change_counter.as_ref(),
)));
let snapshot_notifier = self.snapshot_notifier.clone();
let compression = self.use_compression;
let handle = tokio::spawn(async move {
let start = Instant::now();
let body = match Self::maybe_compress_main_db_file(db_file, compression).await {
Ok(file) => file,
Err(e) => {
tracing::error!("Failed to compress db file (generation {}): {}", gen, e);
let _ = snapshot_notifier.send(Err(e));
return;
}
};
let mut result = snapshot_req.body(body).send().await;
if let Err(e) = result {
tracing::error!("Failed to upload snapshot for generation {}: {:?}", gen, e);
let _ = snapshot_notifier.send(Err(e.into()));
return;
}
result = change_counter_req.send().await;
if let Err(e) = result {
tracing::error!(
"Failed to upload change counter for generation {}: {:?}",
gen,
e
);
let _ = snapshot_notifier.send(Err(e.into()));
return;
}
let _ = snapshot_notifier.send(Ok(Some(gen)));
let elapsed = Instant::now() - start;
tracing::debug!("Snapshot upload finished (took {:?})", elapsed);
let _ = tokio::fs::remove_file(format!("db.{}", compression)).await;
});
let elapsed = Instant::now() - start_ts;
tracing::debug!("Scheduled DB snapshot (took {:?})", elapsed);

Ok(Some(handle))
}

// Returns newest replicated generation, or None, if one is not found.
Expand Down Expand Up @@ -817,6 +867,7 @@ impl Replicator {
}
}

let start_ts = Instant::now();
// first check if there are any remaining files that we didn't manage to upload
// on time in the last run
self.upload_remaining_files(&generation).await?;
Expand Down Expand Up @@ -961,6 +1012,7 @@ impl Replicator {
None => {
if !key.ends_with(".gz")
&& !key.ends_with(".db")
&& !key.ends_with(".raw")
&& !key.ends_with(".meta")
&& !key.ends_with(".changecounter")
{
Expand Down Expand Up @@ -1036,7 +1088,8 @@ impl Replicator {
}

main_db_writer.shutdown().await?;
tracing::info!("Finished database restoration");
let elapsed = Instant::now() - start_ts;
tracing::info!("Finished database restoration in {:?}", elapsed);

if applied_wal_frame {
Ok::<_, anyhow::Error>(RestoreAction::SnapshotMainDbFile)
Expand Down Expand Up @@ -1143,7 +1196,12 @@ impl Replicator {

fn fpath_to_key<'a>(fpath: &'a Path, dir: &str) -> Option<&'a str> {
let str = fpath.to_str()?;
if str.ends_with(".gz") | str.ends_with(".raw") | str.ends_with(".meta") {
if str.ends_with(".db")
| str.ends_with(".gz")
| str.ends_with(".raw")
| str.ends_with(".meta")
| str.ends_with(".changecounter")
{
let idx = str.rfind(dir)?;
return Some(&str[idx..]);
}
Expand Down
5 changes: 4 additions & 1 deletion sqld/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,10 @@ pub async fn init_bottomless_replicator(
bottomless::replicator::RestoreAction::None => (),
bottomless::replicator::RestoreAction::SnapshotMainDbFile => {
replicator.new_generation();
did_recover = replicator.snapshot_main_db_file().await?;
if let Some(handle) = replicator.snapshot_main_db_file(None).await? {
handle.await?;
did_recover = true;
}
// Restoration process only leaves the local WAL file if it was
// detected to be newer than its remote counterpart.
replicator.maybe_replicate_wal().await?
Expand Down
6 changes: 3 additions & 3 deletions sqld/src/replication/primary/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,9 @@ unsafe impl WalHook for ReplicationLoggerHook {
);
return SQLITE_IOERR_WRITE;
}
replicator.new_generation();
if let Err(e) =
runtime.block_on(async move { replicator.snapshot_main_db_file().await })
let prev = replicator.new_generation();
if let Err(e) = runtime
.block_on(async move { replicator.snapshot_main_db_file(Some(prev)).await })
{
tracing::error!("Failed to snapshot the main db file during checkpoint: {e}");
return SQLITE_IOERR_WRITE;
Expand Down