Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

flush frames and confirm snapshot completion before calling SQLite checkpoint #704

Merged
merged 2 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bottomless/src/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl WalCopier {
if let Some(wal) = self.wal.as_mut() {
wal
} else {
return Err(anyhow!("WAL file not found: \"{:?}\"", self.wal_path));
return Err(anyhow!("WAL file not found: `{}`", self.wal_path));
}
};
let generation = if let Some(generation) = self.generation.load_full() {
Expand Down
63 changes: 38 additions & 25 deletions bottomless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,31 @@ pub extern "C" fn xCheckpoint(
tracing::trace!("Ignoring a checkpoint request weaker than TRUNCATE");
return ffi::SQLITE_OK;
}

let ctx = get_replicator_context(wal);
let last_known_frame = ctx.replicator.last_known_frame();
ctx.replicator.request_flush();
if last_known_frame == 0 {
tracing::debug!("No committed changes in this generation, not snapshotting");
ctx.replicator.skip_snapshot_for_current_generation();
return ffi::SQLITE_OK;
}
if let Err(e) = block_on!(
ctx.runtime,
ctx.replicator.wait_until_committed(last_known_frame)
) {
tracing::error!(
"Failed to finalize frame {} replication: {}",
last_known_frame,
e
);
return ffi::SQLITE_IOERR_WRITE;
}
if let Err(e) = block_on!(ctx.runtime, ctx.replicator.wait_until_snapshotted()) {
tracing::error!("Failed to finalize snapshot replication: {}", e);
return ffi::SQLITE_IOERR_WRITE;
}

/* If there's no busy handler, let's provide a default one,
** since we auto-upgrade the passive checkpoint
*/
Expand Down Expand Up @@ -342,31 +367,19 @@ pub extern "C" fn xCheckpoint(
return rc;
}

let ctx = get_replicator_context(wal);
let last_known_frame = ctx.replicator.last_known_frame();
ctx.replicator.request_flush();
if last_known_frame == 0 {
tracing::debug!("No committed changes in this generation, not snapshotting");
ctx.replicator.skip_snapshot_for_current_generation();
return ffi::SQLITE_OK;
}
if let Err(e) = block_on!(
ctx.runtime,
ctx.replicator.wait_until_committed(last_known_frame)
) {
tracing::error!("Failed to finalize replication: {}", e);
return ffi::SQLITE_IOERR_WRITE;
}

let prev = ctx.replicator.new_generation();
let _prev = ctx.replicator.new_generation();
tracing::debug!("Snapshotting after checkpoint");
let result = block_on!(ctx.runtime, ctx.replicator.snapshot_main_db_file(prev));
if let Err(e) = result {
tracing::error!(
"Failed to snapshot the main db file during checkpoint: {}",
e
);
return ffi::SQLITE_IOERR_WRITE;
match block_on!(ctx.runtime, ctx.replicator.snapshot_main_db_file()) {
Ok(_handle) => {
tracing::trace!("got snapshot handle");
}
Err(e) => {
tracing::error!(
"Failed to snapshot the main db file during checkpoint: {}",
e
);
return ffi::SQLITE_IOERR_WRITE;
}
}
tracing::debug!("Checkpoint completed in {:?}", Instant::now() - start);

Expand Down Expand Up @@ -417,7 +430,7 @@ async fn try_restore(replicator: &mut replicator::Replicator) -> i32 {
match replicator.restore(None, None).await {
Ok((replicator::RestoreAction::SnapshotMainDbFile, _)) => {
replicator.new_generation();
match replicator.snapshot_main_db_file(None).await {
match replicator.snapshot_main_db_file().await {
Ok(Some(h)) => {
if let Err(e) = h.await {
tracing::error!("Failed to join snapshot main db file task: {}", e);
Expand Down
56 changes: 31 additions & 25 deletions bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,18 +373,28 @@ impl Replicator {
self.last_sent_frame_no.load(Ordering::Acquire)
}

pub async fn wait_until_snapshotted(&mut self, generation: Uuid) -> Result<()> {
let res = self
.snapshot_waiter
.wait_for(|result| match result {
Ok(Some(gen)) => *gen == generation,
Ok(None) => false,
Err(_) => true,
})
.await?;
match res.deref() {
Ok(_) => Ok(()),
Err(e) => Err(anyhow!("Failed snapshot generation {}: {}", generation, e)),
pub async fn wait_until_snapshotted(&mut self) -> Result<bool> {
if let Ok(generation) = self.generation() {
if !self.main_db_exists_and_not_empty().await {
tracing::debug!("Not snapshotting, the main db file does not exist or is empty");
let _ = self.snapshot_notifier.send(Ok(Some(generation)));
return Ok(false);
}
tracing::debug!("waiting for generation snapshot {} to complete", generation);
let res = self
.snapshot_waiter
.wait_for(|result| match result {
Ok(Some(gen)) => *gen == generation,
Ok(None) => false,
Err(_) => true,
})
.await?;
match res.deref() {
Ok(_) => Ok(true),
Err(e) => Err(anyhow!("Failed snapshot generation {}: {}", generation, e)),
}
} else {
Ok(false)
}
}

Expand Down Expand Up @@ -706,23 +716,18 @@ impl Replicator {
// Sends the main database file to S3 - if -wal file is present, it's replicated
// too - it means that the local file was detected to be newer than its remote
// counterpart.
pub async fn snapshot_main_db_file(
&mut self,
prev_generation: Option<Uuid>,
) -> Result<Option<JoinHandle<()>>> {
pub async fn snapshot_main_db_file(&mut self) -> Result<Option<JoinHandle<()>>> {
if !self.main_db_exists_and_not_empty().await {
tracing::debug!("Not snapshotting, the main db file does not exist or is empty");
let _ = self.snapshot_notifier.send(Ok(prev_generation));
let generation = self.generation()?;
tracing::debug!(
"Not snapshotting {}, the main db file does not exist or is empty",
generation
);
let _ = self.snapshot_notifier.send(Ok(Some(generation)));
return Ok(None);
}
let generation = self.generation()?;
tracing::debug!("Snapshotting generation {}", generation);
let start_ts = Instant::now();
if let Some(prev) = prev_generation {
tracing::debug!("waiting for previous generation {} to complete", prev);
self.wait_until_snapshotted(prev).await?;
}

let client = self.client.clone();
let mut db_file = File::open(&self.db_path).await?;
let change_counter = Self::read_change_counter(&mut db_file).await?;
Expand All @@ -749,6 +754,7 @@ impl Replicator {
let snapshot_notifier = self.snapshot_notifier.clone();
let compression = self.use_compression;
let handle = tokio::spawn(async move {
tracing::trace!("Start snapshotting generation {}", generation);
let start = Instant::now();
let body = match Self::maybe_compress_main_db_file(db_file, compression).await {
Ok(file) => file,
Expand Down Expand Up @@ -788,7 +794,7 @@ impl Replicator {
let _ = tokio::fs::remove_file(format!("db.{}", compression)).await;
});
let elapsed = Instant::now() - start_ts;
tracing::debug!("Scheduled DB snapshot (took {:?})", elapsed);
tracing::debug!("Scheduled DB snapshot {} (took {:?})", generation, elapsed);

Ok(Some(handle))
}
Expand Down
3 changes: 2 additions & 1 deletion sqld/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ const DB_CREATE_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_AUTO_CHECKPOINT: u32 = 1000;

pub(crate) static BLOCKING_RT: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_current_thread()
tokio::runtime::Builder::new_multi_thread()
.max_blocking_threads(50_000)
.enable_all()
.build()
.unwrap()
});
Expand Down
4 changes: 3 additions & 1 deletion sqld/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,9 @@ pub async fn init_bottomless_replicator(
match action {
bottomless::replicator::RestoreAction::SnapshotMainDbFile => {
replicator.new_generation();
replicator.snapshot_main_db_file(None).await?;
if let Some(_handle) = replicator.snapshot_main_db_file().await? {
tracing::trace!("got snapshot handle after restore with generation upgrade");
}
// Restoration process only leaves the local WAL file if it was
// detected to be newer than its remote counterpart.
replicator.maybe_replicate_wal().await?
Expand Down
53 changes: 35 additions & 18 deletions sqld/src/replication/primary/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,39 @@ unsafe impl WalHook for ReplicationLoggerHook {
return SQLITE_BUSY;
}
}

#[allow(clippy::await_holding_lock)]
// uncontended -> only gets called under a libSQL write lock
{
let ctx = Self::wal_extract_ctx(wal);
let runtime = tokio::runtime::Handle::current();
if let Some(replicator) = ctx.bottomless_replicator.as_mut() {
let mut replicator = replicator.lock().unwrap();
let last_known_frame = replicator.last_known_frame();
replicator.request_flush();
if last_known_frame == 0 {
tracing::debug!("No comitted changes in this generation, not snapshotting");
replicator.skip_snapshot_for_current_generation();
return SQLITE_OK;
}
if let Err(e) = runtime.block_on(replicator.wait_until_committed(last_known_frame))
{
tracing::error!(
"Failed to wait for S3 replicator to confirm {} frames backup: {}",
last_known_frame,
e
);
return SQLITE_IOERR_WRITE;
}
if let Err(e) = runtime.block_on(replicator.wait_until_snapshotted()) {
tracing::error!(
"Failed to wait for S3 replicator to confirm database snapshot backup: {}",
e
);
return SQLITE_IOERR_WRITE;
}
}
}
let rc = unsafe {
orig(
wal,
Expand All @@ -229,25 +262,9 @@ unsafe impl WalHook for ReplicationLoggerHook {
let runtime = tokio::runtime::Handle::current();
if let Some(replicator) = ctx.bottomless_replicator.as_mut() {
let mut replicator = replicator.lock().unwrap();
let last_known_frame = replicator.last_known_frame();
replicator.request_flush();
if last_known_frame == 0 {
tracing::debug!("No comitted changes in this generation, not snapshotting");
replicator.skip_snapshot_for_current_generation();
return SQLITE_OK;
}
if let Err(e) = runtime.block_on(replicator.wait_until_committed(last_known_frame))
{
tracing::error!(
"Failed to wait for S3 replicator to confirm {} frames backup: {}",
last_known_frame,
e
);
return SQLITE_IOERR_WRITE;
}
let prev = replicator.new_generation();
let _prev = replicator.new_generation();
if let Err(e) =
runtime.block_on(async move { replicator.snapshot_main_db_file(prev).await })
runtime.block_on(async move { replicator.snapshot_main_db_file().await })
{
tracing::error!("Failed to snapshot the main db file during checkpoint: {e}");
return SQLITE_IOERR_WRITE;
Expand Down