From b5083a6422d164dfdf91e53c0c0e6f186e3da0ff Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Wed, 4 Oct 2023 07:44:37 +0200 Subject: [PATCH 1/2] changed the path where bottomless is storing gzipped db snapshot --- bottomless/src/replicator.rs | 41 +++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 76a25f1b..bfdddc52 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -18,7 +18,7 @@ use bytes::{Buf, Bytes}; use chrono::{NaiveDateTime, TimeZone, Utc}; use std::io::SeekFrom; use std::ops::Deref; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use tokio::fs::{File, OpenOptions}; @@ -642,28 +642,40 @@ impl Replicator { // Returns the compressed database file path and its change counter, extracted // from the header of page1 at offset 24..27 (as per SQLite documentation). pub async fn maybe_compress_main_db_file( - mut reader: File, + db_path: &PathBuf, compression: CompressionKind, ) -> Result { - reader.seek(SeekFrom::Start(0)).await?; match compression { - CompressionKind::None => Ok(ByteStream::read_from().file(reader).build().await?), + CompressionKind::None => Ok(ByteStream::from_path(db_path).await?), CompressionKind::Gzip => { + let mut reader = File::open(db_path).await?; + let gzip_path = Self::db_gzip_path(db_path); let compressed_file = OpenOptions::new() .create(true) .write(true) .read(true) .truncate(true) - .open("db.gz") + .open(&gzip_path) .await?; let mut writer = GzipEncoder::new(compressed_file); let size = tokio::io::copy(&mut reader, &mut writer).await?; - tracing::trace!("Compressed database file ({} bytes) into db.gz", size); writer.shutdown().await?; - Ok(ByteStream::from_path("db.gz").await?) + tracing::debug!( + "Compressed database file ({} bytes) into `{}`", + size, + gzip_path.display() + ); + Ok(ByteStream::from_path(gzip_path).await?) } } } + + fn db_gzip_path(db_path: &PathBuf) -> PathBuf { + let mut gzip_path = db_path.clone(); + gzip_path.pop(); + gzip_path.join("db.gz") + } + // Replicates local WAL pages to S3, if local WAL is present. // This function is called under the assumption that if local WAL // file is present, it was already detected to be newer than its @@ -729,8 +741,10 @@ impl Replicator { let generation = self.generation()?; let start_ts = Instant::now(); let client = self.client.clone(); - let mut db_file = File::open(&self.db_path).await?; - let change_counter = Self::read_change_counter(&mut db_file).await?; + let change_counter = { + let mut db_file = File::open(&self.db_path).await?; + Self::read_change_counter(&mut db_file).await? + }; let snapshot_req = client.put_object().bucket(self.bucket.clone()).key(format!( "{}-{}/db.{}", self.db_name, generation, self.use_compression @@ -753,14 +767,15 @@ impl Replicator { ))); let snapshot_notifier = self.snapshot_notifier.clone(); let compression = self.use_compression; + let db_path = PathBuf::from(self.db_path.clone()); let handle = tokio::spawn(async move { tracing::trace!("Start snapshotting generation {}", generation); let start = Instant::now(); - let body = match Self::maybe_compress_main_db_file(db_file, compression).await { + let body = match Self::maybe_compress_main_db_file(&db_path, compression).await { Ok(file) => file, Err(e) => { tracing::error!( - "Failed to compress db file (generation {}): {}", + "Failed to compress db file (generation {}): {:?}", generation, e ); @@ -791,7 +806,9 @@ impl Replicator { let _ = snapshot_notifier.send(Ok(Some(generation))); let elapsed = Instant::now() - start; tracing::debug!("Snapshot upload finished (took {:?})", elapsed); - let _ = tokio::fs::remove_file(format!("db.{}", compression)).await; + // cleanup gzip database snapshot if exists + let gzip_path = Self::db_gzip_path(&db_path); + let _ = tokio::fs::remove_file(gzip_path).await; }); let elapsed = Instant::now() - start_ts; tracing::debug!("Scheduled DB snapshot {} (took {:?})", generation, elapsed); From 02103c407fb5c1f227d3ef2efb58512890b48832 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Wed, 4 Oct 2023 08:25:14 +0200 Subject: [PATCH 2/2] fixed clippy suggestions --- bottomless/src/replicator.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index bfdddc52..be1f79ad 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -642,7 +642,7 @@ impl Replicator { // Returns the compressed database file path and its change counter, extracted // from the header of page1 at offset 24..27 (as per SQLite documentation). pub async fn maybe_compress_main_db_file( - db_path: &PathBuf, + db_path: &Path, compression: CompressionKind, ) -> Result { match compression { @@ -670,8 +670,8 @@ impl Replicator { } } - fn db_gzip_path(db_path: &PathBuf) -> PathBuf { - let mut gzip_path = db_path.clone(); + fn db_gzip_path(db_path: &Path) -> PathBuf { + let mut gzip_path = db_path.to_path_buf(); gzip_path.pop(); gzip_path.join("db.gz") }