Skip to content

Commit

Permalink
Create db-dump.zip file too
Browse files Browse the repository at this point in the history
Zip files use compression per file, which allows users to only extract the data that they need, instead of needlessly extracting the full tarball to read the small table that they are interested in.
  • Loading branch information
Turbo87 committed May 28, 2024
1 parent f6e877d commit e61b20a
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 36 deletions.
75 changes: 75 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ tracing-subscriber = { version = "=0.3.18", features = ["env-filter"] }
typomania = { version = "=0.1.2", default-features = false }
url = "=2.5.0"
unicode-xid = "=0.2.4"
zip = { version = "=2.1.1", default-features = false, features = ["deflate"] }

[dev-dependencies]
bytes = "=1.6.0"
Expand Down
37 changes: 35 additions & 2 deletions src/tests/dump_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use insta::{assert_debug_snapshot, assert_snapshot};
use once_cell::sync::Lazy;
use regex::Regex;
use secrecy::ExposeSecret;
use std::io::Read;
use std::io::{Cursor, Read};
use tar::Archive;

static PATH_DATE_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"^\d{4}-\d{2}-\d{2}-\d{6}").unwrap());
Expand All @@ -28,8 +28,9 @@ async fn test_dump_db_job() {
app.run_pending_background_jobs().await;

let stored_files = app.stored_files().await;
assert_eq!(stored_files.len(), 1);
assert_eq!(stored_files.len(), 2);
assert_eq!(stored_files[0], "db-dump.tar.gz");
assert_eq!(stored_files[1], "db-dump.zip");

let path = object_store::path::Path::parse("db-dump.tar.gz").unwrap();
let result = app.as_inner().storage.as_inner().get(&path).await.unwrap();
Expand Down Expand Up @@ -65,6 +66,38 @@ async fn test_dump_db_job() {
"YYYY-MM-DD-HHMMSS/data/version_downloads.csv",
]
"###);

let path = object_store::path::Path::parse("db-dump.zip").unwrap();
let result = app.as_inner().storage.as_inner().get(&path).await.unwrap();
let bytes = result.bytes().await.unwrap();

let archive = zip::ZipArchive::new(Cursor::new(bytes)).unwrap();
let zip_paths = archive.file_names().collect::<Vec<_>>();
assert_debug_snapshot!(zip_paths, @r###"
[
"README.md",
"export.sql",
"import.sql",
"metadata.json",
"schema.sql",
"data/",
"data/categories.csv",
"data/crate_downloads.csv",
"data/crates.csv",
"data/keywords.csv",
"data/metadata.csv",
"data/reserved_crate_names.csv",
"data/teams.csv",
"data/users.csv",
"data/crates_categories.csv",
"data/crates_keywords.csv",
"data/crate_owners.csv",
"data/versions.csv",
"data/default_versions.csv",
"data/dependencies.csv",
"data/version_downloads.csv",
]
"###);
}

fn tar_paths<R: Read>(archive: &mut Archive<R>) -> Vec<String> {
Expand Down
127 changes: 93 additions & 34 deletions src/worker/jobs/dump_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crates_io_worker::BackgroundJob;
use std::fs::{self, File};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use zip::write::SimpleFileOptions;

#[derive(Clone, Serialize, Deserialize)]
pub struct DumpDb {
Expand All @@ -28,38 +29,56 @@ impl BackgroundJob for DumpDb {
/// Create CSV dumps of the public information in the database, wrap them in a
/// tarball and upload to S3.
async fn run(&self, env: Self::Context) -> anyhow::Result<()> {
let target_name = "db-dump.tar.gz";
const TAR_PATH: &str = "db-dump.tar.gz";
const ZIP_PATH: &str = "db-dump.zip";

let database_url = self.database_url.clone();

let tarball = spawn_blocking(move || {
let (tarball, zip) = spawn_blocking(move || {
let directory = DumpDirectory::create()?;

info!("Begin exporting database");
info!("Exporting database");
directory.populate(&database_url)?;

let export_dir = directory.path();
info!(path = ?export_dir, "Creating tarball");
let prefix = PathBuf::from(directory.timestamp.format("%Y-%m-%d-%H%M%S").to_string());
create_tarball(export_dir, &prefix)
info!(path = ?export_dir, "Creating tarball…");
let tarball_prefix =
PathBuf::from(directory.timestamp.format("%Y-%m-%d-%H%M%S").to_string());
create_archives(export_dir, &tarball_prefix)
})
.await?;

info!("Uploading tarball");
env.storage
.upload_db_dump(target_name, tarball.path())
.await?;
info!("Uploading tarball…");
env.storage.upload_db_dump(TAR_PATH, tarball.path()).await?;
info!("Database dump tarball uploaded");

info!("Invalidating CDN caches");
info!("Invalidating CDN caches…");
if let Some(cloudfront) = env.cloudfront() {
if let Err(error) = cloudfront.invalidate(TAR_PATH).await {
warn!("Failed to invalidate CloudFront cache: {}", error);
}
}

if let Some(fastly) = env.fastly() {
if let Err(error) = fastly.invalidate(TAR_PATH).await {
warn!("Failed to invalidate Fastly cache: {}", error);
}
}

info!("Uploading zip file…");
env.storage.upload_db_dump(ZIP_PATH, zip.path()).await?;
info!("Database dump zip file uploaded");

info!("Invalidating CDN caches…");
if let Some(cloudfront) = env.cloudfront() {
if let Err(error) = cloudfront.invalidate(target_name).await {
warn!("failed to invalidate CloudFront cache: {}", error);
if let Err(error) = cloudfront.invalidate(ZIP_PATH).await {
warn!("Failed to invalidate CloudFront cache: {}", error);
}
}

if let Some(fastly) = env.fastly() {
if let Err(error) = fastly.invalidate(target_name).await {
warn!("failed to invalidate Fastly cache: {}", error);
if let Err(error) = fastly.invalidate(ZIP_PATH).await {
warn!("Failed to invalidate Fastly cache: {}", error);
}
}

Expand Down Expand Up @@ -202,15 +221,22 @@ pub fn run_psql(script: &Path, database_url: &str) -> anyhow::Result<()> {
Ok(())
}

fn create_tarball(export_dir: &Path, prefix: &Path) -> anyhow::Result<tempfile::NamedTempFile> {
debug!("Creating tarball file");
let tempfile = tempfile::NamedTempFile::new()?;
let encoder = flate2::write::GzEncoder::new(tempfile.as_file(), flate2::Compression::default());
fn create_archives(
export_dir: &Path,
tarball_prefix: &Path,
) -> anyhow::Result<(tempfile::NamedTempFile, tempfile::NamedTempFile)> {
debug!("Creating tarball file…");
let tar_tempfile = tempfile::NamedTempFile::new()?;
let encoder =
flate2::write::GzEncoder::new(tar_tempfile.as_file(), flate2::Compression::default());
let mut tar = tar::Builder::new(encoder);

let mut archive = tar::Builder::new(encoder);
debug!("Creating zip file…");
let zip_tempfile = tempfile::NamedTempFile::new()?;
let mut zip = zip::ZipWriter::new(zip_tempfile.as_file());

debug!(path = ?prefix, "Appending directory to tarball");
archive.append_dir(prefix, export_dir)?;
debug!("Appending `{tarball_prefix:?}` directory to tarball");
tar.append_dir(tarball_prefix, export_dir)?;

// Append readme, metadata, schemas.
let mut paths = Vec::new();
Expand All @@ -224,9 +250,13 @@ fn create_tarball(export_dir: &Path, prefix: &Path) -> anyhow::Result<tempfile::
// Sort paths to make the tarball deterministic.
paths.sort();
for (path, file_name) in paths {
let name_in_tar = prefix.join(file_name);
debug!(name = ?name_in_tar, "Appending file to tarball");
archive.append_path_with_name(path, name_in_tar)?;
let name = tarball_prefix.join(&file_name);
debug!("Appending `{name:?}` file to tarball…");
tar.append_path_with_name(&path, name)?;

debug!("Appending `{file_name:?}` file to zip file…");
zip.start_file_from_path(&file_name, SimpleFileOptions::default())?;
std::io::copy(&mut File::open(path)?, &mut zip)?;
}

// Append topologically sorted tables to make it possible to pipeline
Expand All @@ -236,21 +266,34 @@ fn create_tarball(export_dir: &Path, prefix: &Path) -> anyhow::Result<tempfile::
let visibility_config = VisibilityConfig::get();
let sorted_tables = visibility_config.topological_sort();

let path = prefix.join("data");
debug!(?path, "Appending directory to tarball");
archive.append_dir(path, export_dir.join("data"))?;
let path = tarball_prefix.join("data");
debug!("Appending `data` directory to tarball…");
tar.append_dir(path, export_dir.join("data"))?;

debug!("Appending `data` directory to zip file…");
zip.add_directory("data", SimpleFileOptions::default())?;

for table in sorted_tables {
let csv_path = export_dir.join("data").join(table).with_extension("csv");
if csv_path.exists() {
let name_in_tar = prefix.join("data").join(table).with_extension("csv");
debug!(name = ?name_in_tar, "Appending file to tarball");
archive.append_path_with_name(csv_path, name_in_tar)?;
let name = tarball_prefix
.join("data")
.join(table)
.with_extension("csv");
debug!("Appending `{name:?}` file to tarball…");
tar.append_path_with_name(&csv_path, name)?;

let name = PathBuf::from("data").join(table).with_extension("csv");
debug!("Appending `{name:?}` file to zip file…");
zip.start_file_from_path(&name, SimpleFileOptions::default())?;
std::io::copy(&mut File::open(csv_path)?, &mut zip)?;
}
}

drop(archive);
drop(tar);
zip.finish()?;

Ok(tempfile)
Ok((tar_tempfile, zip_tempfile))
}

mod configuration;
Expand All @@ -261,6 +304,7 @@ mod tests {
use super::*;
use flate2::read::GzDecoder;
use insta::assert_debug_snapshot;
use std::io::BufReader;
use tar::Archive;

#[test]
Expand All @@ -277,7 +321,7 @@ mod tests {
fs::write(p.join("data").join("crate_owners.csv"), "").unwrap();
fs::write(p.join("data").join("users.csv"), "").unwrap();

let tarball = create_tarball(p, &PathBuf::from("0000-00-00")).unwrap();
let (tarball, zip) = create_archives(p, &PathBuf::from("0000-00-00")).unwrap();
let gz = GzDecoder::new(File::open(tarball.path()).unwrap());
let mut tar = Archive::new(gz);

Expand All @@ -296,5 +340,20 @@ mod tests {
"0000-00-00/data/crate_owners.csv",
]
"###);

let file = File::open(zip.path()).unwrap();
let reader = BufReader::new(file);

let archive = zip::ZipArchive::new(reader).unwrap();
let zip_paths = archive.file_names().collect::<Vec<_>>();
assert_debug_snapshot!(zip_paths, @r###"
[
"README.md",
"data/",
"data/crates.csv",
"data/users.csv",
"data/crate_owners.csv",
]
"###);
}
}

0 comments on commit e61b20a

Please sign in to comment.