Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

changed the path where bottomless is storing gzipped db snapshot #727

Merged
merged 2 commits into from
Oct 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 29 additions & 12 deletions bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use bytes::{Buf, Bytes};
use chrono::{NaiveDateTime, TimeZone, Utc};
use std::io::SeekFrom;
use std::ops::Deref;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use tokio::fs::{File, OpenOptions};
Expand Down Expand Up @@ -642,28 +642,40 @@ impl Replicator {
// Returns the compressed database file path and its change counter, extracted
// from the header of page1 at offset 24..27 (as per SQLite documentation).
pub async fn maybe_compress_main_db_file(
mut reader: File,
db_path: &Path,
compression: CompressionKind,
) -> Result<ByteStream> {
reader.seek(SeekFrom::Start(0)).await?;
match compression {
CompressionKind::None => Ok(ByteStream::read_from().file(reader).build().await?),
CompressionKind::None => Ok(ByteStream::from_path(db_path).await?),
CompressionKind::Gzip => {
let mut reader = File::open(db_path).await?;
let gzip_path = Self::db_gzip_path(db_path);
let compressed_file = OpenOptions::new()
.create(true)
.write(true)
.read(true)
.truncate(true)
.open("db.gz")
.open(&gzip_path)
.await?;
let mut writer = GzipEncoder::new(compressed_file);
let size = tokio::io::copy(&mut reader, &mut writer).await?;
tracing::trace!("Compressed database file ({} bytes) into db.gz", size);
writer.shutdown().await?;
Ok(ByteStream::from_path("db.gz").await?)
tracing::debug!(
"Compressed database file ({} bytes) into `{}`",
size,
gzip_path.display()
);
Ok(ByteStream::from_path(gzip_path).await?)
}
}
}

fn db_gzip_path(db_path: &Path) -> PathBuf {
let mut gzip_path = db_path.to_path_buf();
gzip_path.pop();
gzip_path.join("db.gz")
}

// Replicates local WAL pages to S3, if local WAL is present.
// This function is called under the assumption that if local WAL
// file is present, it was already detected to be newer than its
Expand Down Expand Up @@ -729,8 +741,10 @@ impl Replicator {
let generation = self.generation()?;
let start_ts = Instant::now();
let client = self.client.clone();
let mut db_file = File::open(&self.db_path).await?;
let change_counter = Self::read_change_counter(&mut db_file).await?;
let change_counter = {
let mut db_file = File::open(&self.db_path).await?;
Self::read_change_counter(&mut db_file).await?
};
let snapshot_req = client.put_object().bucket(self.bucket.clone()).key(format!(
"{}-{}/db.{}",
self.db_name, generation, self.use_compression
Expand All @@ -753,14 +767,15 @@ impl Replicator {
)));
let snapshot_notifier = self.snapshot_notifier.clone();
let compression = self.use_compression;
let db_path = PathBuf::from(self.db_path.clone());
let handle = tokio::spawn(async move {
tracing::trace!("Start snapshotting generation {}", generation);
let start = Instant::now();
let body = match Self::maybe_compress_main_db_file(db_file, compression).await {
let body = match Self::maybe_compress_main_db_file(&db_path, compression).await {
Ok(file) => file,
Err(e) => {
tracing::error!(
"Failed to compress db file (generation {}): {}",
"Failed to compress db file (generation {}): {:?}",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also changed error printout to Debug format - hopefully it will be more informative than IO error we got so far.

generation,
e
);
Expand Down Expand Up @@ -791,7 +806,9 @@ impl Replicator {
let _ = snapshot_notifier.send(Ok(Some(generation)));
let elapsed = Instant::now() - start;
tracing::debug!("Snapshot upload finished (took {:?})", elapsed);
let _ = tokio::fs::remove_file(format!("db.{}", compression)).await;
// cleanup gzip database snapshot if exists
let gzip_path = Self::db_gzip_path(&db_path);
let _ = tokio::fs::remove_file(gzip_path).await;
});
let elapsed = Instant::now() - start_ts;
tracing::debug!("Scheduled DB snapshot {} (took {:?})", generation, elapsed);
Expand Down