diff --git a/Cargo.lock b/Cargo.lock index 26cbba58daa..0710cd3fc4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3578,7 +3578,7 @@ dependencies = [ [[package]] name = "mithril-aggregator" -version = "0.6.19" +version = "0.6.20" dependencies = [ "anyhow", "async-trait", diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index ab62396dd4f..f249bc6a212 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-aggregator" -version = "0.6.19" +version = "0.6.20" description = "A Mithril Aggregator server" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-aggregator/src/artifact_builder/cardano_database.rs b/mithril-aggregator/src/artifact_builder/cardano_database.rs index 1b655e0ad76..1622ab789ce 100644 --- a/mithril-aggregator/src/artifact_builder/cardano_database.rs +++ b/mithril-aggregator/src/artifact_builder/cardano_database.rs @@ -137,9 +137,9 @@ mod tests { use crate::{ artifact_builder::{MockAncillaryFileUploader, MockImmutableFilesUploader}, immutable_file_digest_mapper::MockImmutableFileDigestMapper, + services::DumbSnapshotter, test_tools::TestLogger, tools::url_sanitizer::SanitizedUrlWithTrailingSlash, - DumbSnapshotter, }; use super::*; diff --git a/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/ancillary.rs b/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/ancillary.rs index d1bfac107ae..b3be4c7fb52 100644 --- a/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/ancillary.rs +++ b/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/ancillary.rs @@ -16,8 +16,8 @@ use mithril_common::{ use crate::{ file_uploaders::{GcpUploader, LocalUploader}, - snapshotter::OngoingSnapshot, - DumbUploader, FileUploader, Snapshotter, + services::{OngoingSnapshot, Snapshotter}, + DumbUploader, FileUploader, }; /// The [AncillaryFileUploader] trait allows identifying uploaders that return locations for ancillary archive files. @@ -211,10 +211,10 @@ mod tests { }; use uuid::Uuid; - use crate::{ - test_tools::TestLogger, CompressedArchiveSnapshotter, DumbSnapshotter, - SnapshotterCompressionAlgorithm, + use crate::services::{ + CompressedArchiveSnapshotter, DumbSnapshotter, SnapshotterCompressionAlgorithm, }; + use crate::test_tools::TestLogger; use super::*; diff --git a/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/immutable.rs b/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/immutable.rs index ea83858ad6b..6896aa59a40 100644 --- a/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/immutable.rs +++ b/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/immutable.rs @@ -17,7 +17,8 @@ use mithril_common::{ use crate::{ file_uploaders::{GcpUploader, LocalUploader}, - DumbUploader, FileUploader, Snapshotter, + services::Snapshotter, + DumbUploader, FileUploader, }; fn immmutable_file_number_extractor(file_uri: &str) -> StdResult> { @@ -224,11 +225,11 @@ mod tests { use mockall::predicate::{always, eq}; use uuid::Uuid; - use crate::{ - snapshotter::{MockSnapshotter, OngoingSnapshot}, - test_tools::TestLogger, - CompressedArchiveSnapshotter, DumbSnapshotter, SnapshotterCompressionAlgorithm, + use crate::services::{ + CompressedArchiveSnapshotter, DumbSnapshotter, MockSnapshotter, OngoingSnapshot, + SnapshotterCompressionAlgorithm, }; + use crate::test_tools::TestLogger; use super::*; @@ -767,8 +768,7 @@ mod tests { let url_prefix = SanitizedUrlWithTrailingSlash::parse("http://test.com:8080/base-root").unwrap(); - let uploader = - LocalUploader::new(url_prefix, &target_dir, TestLogger::stdout()).unwrap(); + let uploader = LocalUploader::new(url_prefix, &target_dir, TestLogger::stdout()); let location = ImmutableFilesUploader::batch_upload( &uploader, &[archive_1.clone(), archive_2.clone()], @@ -802,8 +802,7 @@ mod tests { let url_prefix = SanitizedUrlWithTrailingSlash::parse("http://test.com:8080/base-root").unwrap(); - let uploader = - LocalUploader::new(url_prefix, &target_dir, TestLogger::stdout()).unwrap(); + let uploader = LocalUploader::new(url_prefix, &target_dir, TestLogger::stdout()); ImmutableFilesUploader::batch_upload(&uploader, &[archive]) .await diff --git a/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs b/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs index 91a068138b8..64aa4efbb94 100644 --- a/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs +++ b/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs @@ -7,7 +7,10 @@ use std::path::Path; use std::sync::Arc; use thiserror::Error; -use crate::{snapshotter::OngoingSnapshot, FileUploader, Snapshotter}; +use crate::{ + services::{OngoingSnapshot, Snapshotter}, + FileUploader, +}; use super::ArtifactBuilder; use mithril_common::logging::LoggerExtensions; @@ -178,7 +181,8 @@ mod tests { use mithril_common::{entities::CompressionAlgorithm, test_utils::fake_data}; use crate::{ - file_uploaders::MockFileUploader, test_tools::TestLogger, DumbSnapshotter, DumbUploader, + file_uploaders::MockFileUploader, services::DumbSnapshotter, test_tools::TestLogger, + DumbUploader, }; use super::*; diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index 25330efce25..1e22cde1692 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -75,19 +75,19 @@ use crate::{ }, services::{ AggregatorSignableSeedBuilder, AggregatorUpkeepService, BufferedCertifierService, - CardanoTransactionsImporter, CertifierService, EpochServiceDependencies, MessageService, - MithrilCertifierService, MithrilEpochService, MithrilMessageService, MithrilProverService, + CardanoTransactionsImporter, CertifierService, CompressedArchiveSnapshotter, + DumbSnapshotter, EpochServiceDependencies, MessageService, MithrilCertifierService, + MithrilEpochService, MithrilMessageService, MithrilProverService, MithrilSignedEntityService, MithrilStakeDistributionService, ProverService, - SignedEntityService, SignedEntityServiceArtifactsDependencies, StakeDistributionService, - UpkeepService, UsageReporter, + SignedEntityService, SignedEntityServiceArtifactsDependencies, Snapshotter, + SnapshotterCompressionAlgorithm, StakeDistributionService, UpkeepService, UsageReporter, }, store::CertificatePendingStorer, tools::{CExplorerSignerRetriever, GenesisToolsDependency, SignersImporter}, - AggregatorConfig, AggregatorRunner, AggregatorRuntime, CompressedArchiveSnapshotter, - Configuration, DependencyContainer, DumbSnapshotter, DumbUploader, EpochSettingsStorer, - ImmutableFileDigestMapper, LocalSnapshotUploader, MetricsService, MithrilSignerRegisterer, - MultiSigner, MultiSignerImpl, SingleSignatureAuthenticator, SnapshotUploaderType, Snapshotter, - SnapshotterCompressionAlgorithm, VerificationKeyStorer, + AggregatorConfig, AggregatorRunner, AggregatorRuntime, Configuration, DependencyContainer, + DumbUploader, EpochSettingsStorer, ImmutableFileDigestMapper, LocalSnapshotUploader, + MetricsService, MithrilSignerRegisterer, MultiSigner, MultiSignerImpl, + SingleSignatureAuthenticator, SnapshotUploaderType, VerificationKeyStorer, }; const SQLITE_FILE: &str = "aggregator.sqlite3"; @@ -490,7 +490,7 @@ impl DependenciesBuilder { self.configuration.get_server_url()?, &snapshot_artifacts_dir, logger, - )?)) + ))) } } } else { @@ -1283,7 +1283,7 @@ impl DependenciesBuilder { ancillary_url_prefix, &target_dir, logger, - )?)]) + ))]) } } } else { @@ -1321,7 +1321,7 @@ impl DependenciesBuilder { immutable_url_prefix, &target_dir, logger, - )?)]) + ))]) } } } else { @@ -1364,7 +1364,7 @@ impl DependenciesBuilder { digests_url_prefix, &target_dir, logger, - )?)]) + ))]) } } } else { diff --git a/mithril-aggregator/src/dependency_injection/containers.rs b/mithril-aggregator/src/dependency_injection/containers.rs index 50de0d77a0f..581c220a905 100644 --- a/mithril-aggregator/src/dependency_injection/containers.rs +++ b/mithril-aggregator/src/dependency_injection/containers.rs @@ -36,12 +36,12 @@ use crate::{ multi_signer::MultiSigner, services::{ CertifierService, EpochService, MessageService, ProverService, SignedEntityService, - StakeDistributionService, TransactionStore, UpkeepService, + Snapshotter, StakeDistributionService, TransactionStore, UpkeepService, }, signer_registerer::SignerRecorder, store::CertificatePendingStorer, EpochSettingsStorer, MetricsService, SignerRegisterer, SignerRegistrationRoundOpener, - SingleSignatureAuthenticator, Snapshotter, VerificationKeyStorer, + SingleSignatureAuthenticator, VerificationKeyStorer, }; /// EpochServiceWrapper wraps a [EpochService] diff --git a/mithril-aggregator/src/file_uploaders/local_snapshot_uploader.rs b/mithril-aggregator/src/file_uploaders/local_snapshot_uploader.rs index f49c6e42315..98e90d53bba 100644 --- a/mithril-aggregator/src/file_uploaders/local_snapshot_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/local_snapshot_uploader.rs @@ -28,15 +28,15 @@ impl LocalSnapshotUploader { server_url_prefix: SanitizedUrlWithTrailingSlash, target_location: &Path, logger: Logger, - ) -> StdResult { + ) -> Self { let logger = logger.new_with_component_name::(); debug!(logger, "New LocalSnapshotUploader created"; "server_url_prefix" => &server_url_prefix.as_str()); - Ok(Self { + Self { server_url_prefix, target_location: target_location.to_path_buf(), logger, - }) + } } } @@ -100,8 +100,7 @@ mod tests { let url_prefix = SanitizedUrlWithTrailingSlash::parse("http://test.com:8080/base-root").unwrap(); let uploader = - LocalSnapshotUploader::new(url_prefix, target_dir.path(), TestLogger::stdout()) - .unwrap(); + LocalSnapshotUploader::new(url_prefix, target_dir.path(), TestLogger::stdout()); let location = uploader .upload(&archive) .await @@ -120,8 +119,7 @@ mod tests { SanitizedUrlWithTrailingSlash::parse("http://test.com:8080/base-root/").unwrap(), target_dir.path(), TestLogger::stdout(), - ) - .unwrap(); + ); uploader.upload(&archive).await.unwrap(); assert!(target_dir @@ -140,8 +138,7 @@ mod tests { SanitizedUrlWithTrailingSlash::parse("http://test.com:8080/base-root/").unwrap(), target_dir.path(), TestLogger::stdout(), - ) - .unwrap(); + ); uploader .upload(source_dir.path()) .await diff --git a/mithril-aggregator/src/file_uploaders/local_uploader.rs b/mithril-aggregator/src/file_uploaders/local_uploader.rs index 9e4085418ae..da52a6cba24 100644 --- a/mithril-aggregator/src/file_uploaders/local_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/local_uploader.rs @@ -26,15 +26,15 @@ impl LocalUploader { server_url_prefix: SanitizedUrlWithTrailingSlash, target_location: &Path, logger: Logger, - ) -> StdResult { + ) -> Self { let logger = logger.new_with_component_name::(); debug!(logger, "New LocalUploader created"; "server_url_prefix" => &server_url_prefix.as_str()); - Ok(Self { + Self { server_url_prefix, target_location: target_location.to_path_buf(), logger, - }) + } } } @@ -101,7 +101,7 @@ mod tests { let url_prefix = SanitizedUrlWithTrailingSlash::parse("http://test.com:8080/base-root").unwrap(); - let uploader = LocalUploader::new(url_prefix, &target_dir, TestLogger::stdout()).unwrap(); + let uploader = LocalUploader::new(url_prefix, &target_dir, TestLogger::stdout()); let location = FileUploader::upload(&uploader, &archive) .await .expect("local upload should not fail"); @@ -125,8 +125,7 @@ mod tests { SanitizedUrlWithTrailingSlash::parse("http://test.com:8080/base-root/").unwrap(), &target_dir, TestLogger::stdout(), - ) - .unwrap(); + ); FileUploader::upload(&uploader, &archive).await.unwrap(); assert!(target_dir.join(archive.file_name().unwrap()).exists()); @@ -146,8 +145,7 @@ mod tests { SanitizedUrlWithTrailingSlash::parse("http://test.com:8080/base-root/").unwrap(), &target_dir, TestLogger::stdout(), - ) - .unwrap(); + ); FileUploader::upload(&uploader, &source_dir) .await .expect_err("Uploading a directory should fail"); diff --git a/mithril-aggregator/src/lib.rs b/mithril-aggregator/src/lib.rs index b5ade4ead3f..51fa9b5c6e1 100644 --- a/mithril-aggregator/src/lib.rs +++ b/mithril-aggregator/src/lib.rs @@ -27,7 +27,6 @@ mod multi_signer; mod runtime; pub mod services; mod signer_registerer; -mod snapshotter; mod store; mod tools; @@ -49,10 +48,6 @@ pub use signer_registerer::{ MithrilSignerRegisterer, SignerRecorder, SignerRegisterer, SignerRegistrationError, SignerRegistrationRound, SignerRegistrationRoundOpener, }; -pub use snapshotter::{ - CompressedArchiveSnapshotter, DumbSnapshotter, SnapshotError, Snapshotter, - SnapshotterCompressionAlgorithm, -}; pub use store::{CertificatePendingStorer, EpochSettingsStorer, VerificationKeyStorer}; pub use tools::{ CExplorerSignerRetriever, SignersImporter, SignersImporterPersister, SignersImporterRetriever, diff --git a/mithril-aggregator/src/services/mod.rs b/mithril-aggregator/src/services/mod.rs index 9caaef097d5..38ab8d690c5 100644 --- a/mithril-aggregator/src/services/mod.rs +++ b/mithril-aggregator/src/services/mod.rs @@ -16,6 +16,7 @@ mod message; mod prover; mod signable_builder; mod signed_entity; +mod snapshotter; mod stake_distribution; mod upkeep; mod usage_reporter; @@ -27,6 +28,7 @@ pub use message::*; pub use prover::*; pub use signable_builder::*; pub use signed_entity::*; +pub use snapshotter::*; pub use stake_distribution::*; pub use upkeep::*; pub use usage_reporter::*; diff --git a/mithril-aggregator/src/services/snapshotter/appender.rs b/mithril-aggregator/src/services/snapshotter/appender.rs new file mode 100644 index 00000000000..a65a7a29355 --- /dev/null +++ b/mithril-aggregator/src/services/snapshotter/appender.rs @@ -0,0 +1,193 @@ +use anyhow::{anyhow, Context}; +use std::fs::File; +use std::io::Write; +use std::path::PathBuf; + +use mithril_common::StdResult; + +use crate::services::SnapshotError; + +/// Define multiple ways to append content to a tar archive. +pub(super) trait TarAppender { + fn append(&self, tar: &mut tar::Builder) -> StdResult<()>; +} + +pub(super) struct AppenderDirAll { + pub db_directory: PathBuf, +} + +impl TarAppender for AppenderDirAll { + fn append(&self, tar: &mut tar::Builder) -> StdResult<()> { + tar.append_dir_all(".", &self.db_directory) + .map_err(SnapshotError::CreateArchiveError) + .with_context(|| { + format!( + "Can not add directory: '{}' to the archive", + self.db_directory.display() + ) + })?; + Ok(()) + } +} + +pub(super) struct AppenderEntries { + pub(super) entries: Vec, + pub(super) db_directory: PathBuf, +} + +impl TarAppender for AppenderEntries { + fn append(&self, tar: &mut tar::Builder) -> StdResult<()> { + for entry in &self.entries { + let entry_path = self.db_directory.join(entry); + if entry_path.is_dir() { + tar.append_dir_all(entry, entry_path.clone()) + .with_context(|| { + format!( + "Can not add directory: '{}' to the archive", + entry_path.display() + ) + })?; + } else if entry_path.is_file() { + let mut file = File::open(entry_path.clone())?; + tar.append_file(entry, &mut file).with_context(|| { + format!( + "Can not add file: '{}' to the archive", + entry_path.display() + ) + })?; + } else { + return Err(anyhow!( + "The entry: '{}' is not valid", + entry_path.display() + )); + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::path::Path; + + use crate::services::snapshotter::test_tools::*; + use crate::services::{ + CompressedArchiveSnapshotter, Snapshotter, SnapshotterCompressionAlgorithm, + }; + use crate::test_tools::TestLogger; + + use super::*; + + #[test] + fn snapshot_subset_should_create_archive_only_for_specified_directories_and_files() { + let test_dir = get_test_directory("only_for_specified_directories_and_files"); + let destination = test_dir.join(create_dir(&test_dir, "destination")); + let source = test_dir.join(create_dir(&test_dir, "source")); + + let directory_to_archive_path = create_dir(&source, "directory_to_archive"); + let file_to_archive_path = create_file(&source, "file_to_archive.txt"); + let directory_not_to_archive_path = create_dir(&source, "directory_not_to_archive"); + let file_not_to_archive_path = create_file(&source, "file_not_to_archive.txt"); + + let snapshotter = CompressedArchiveSnapshotter::new( + source, + destination, + SnapshotterCompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + let snapshot = snapshotter + .snapshot_subset( + Path::new(&random_archive_name()), + vec![ + directory_to_archive_path.clone(), + file_to_archive_path.clone(), + ], + ) + .unwrap(); + + let unpack_path = unpack_gz_decoder(test_dir, snapshot); + + assert!(unpack_path.join(directory_to_archive_path).is_dir()); + assert!(unpack_path.join(file_to_archive_path).is_file()); + assert!(!unpack_path.join(directory_not_to_archive_path).exists()); + assert!(!unpack_path.join(file_not_to_archive_path).exists()); + } + + #[test] + fn snapshot_subset_return_error_when_file_or_directory_not_exist() { + let test_dir = get_test_directory("file_or_directory_not_exist"); + let destination = test_dir.join(create_dir(&test_dir, "destination")); + let source = test_dir.join(create_dir(&test_dir, "source")); + + let snapshotter = CompressedArchiveSnapshotter::new( + source, + destination, + SnapshotterCompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + snapshotter + .snapshot_subset( + Path::new(&random_archive_name()), + vec![PathBuf::from("not_exist")], + ) + .expect_err("snapshot_subset should return error when file or directory not exist"); + } + + #[test] + fn snapshot_subset_return_error_when_empty_entries() { + let test_dir = get_test_directory("empty_entries"); + let destination = test_dir.join(create_dir(&test_dir, "destination")); + let source = test_dir.join(create_dir(&test_dir, "source")); + + let snapshotter = CompressedArchiveSnapshotter::new( + source, + destination, + SnapshotterCompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + snapshotter + .snapshot_subset(Path::new(&random_archive_name()), vec![]) + .expect_err("snapshot_subset should return error when entries is empty"); + } + + #[test] + fn snapshot_subset_with_duplicate_files_and_directories() { + let test_dir = get_test_directory("with_duplicate_files_and_directories"); + let destination = test_dir.join(create_dir(&test_dir, "destination")); + let source = test_dir.join(create_dir(&test_dir, "source")); + + let directory_to_archive_path = create_dir(&source, "directory_to_archive"); + let file_to_archive_path = create_file(&source, "directory_to_archive/file_to_archive.txt"); + + let snapshotter = CompressedArchiveSnapshotter::new( + source, + destination, + SnapshotterCompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + let snapshot = snapshotter + .snapshot_subset( + Path::new(&random_archive_name()), + vec![ + directory_to_archive_path.clone(), + directory_to_archive_path.clone(), + file_to_archive_path.clone(), + file_to_archive_path.clone(), + ], + ) + .unwrap(); + + let unpack_path = unpack_gz_decoder(test_dir, snapshot); + + assert!(unpack_path.join(directory_to_archive_path).is_dir()); + assert!(unpack_path.join(file_to_archive_path).is_file()); + } +} diff --git a/mithril-aggregator/src/snapshotter.rs b/mithril-aggregator/src/services/snapshotter/compressed_archive_snapshotter.rs similarity index 60% rename from mithril-aggregator/src/snapshotter.rs rename to mithril-aggregator/src/services/snapshotter/compressed_archive_snapshotter.rs index 23f8393270e..9d2656781f1 100644 --- a/mithril-aggregator/src/snapshotter.rs +++ b/mithril-aggregator/src/services/snapshotter/compressed_archive_snapshotter.rs @@ -1,110 +1,24 @@ use anyhow::{anyhow, Context}; -use flate2::Compression; -use flate2::{read::GzDecoder, write::GzEncoder}; +use flate2::{read::GzDecoder, write::GzEncoder, Compression}; use slog::{info, warn, Logger}; -use std::fs::{self, File}; -use std::io::{self, Read, Seek, SeekFrom, Write}; -use std::path::{Path, PathBuf}; -use std::sync::RwLock; +use std::{ + fs, + fs::File, + io::{Read, Seek, SeekFrom}, + path::{Path, PathBuf}, +}; use tar::{Archive, Entry, EntryType}; -use thiserror::Error; use zstd::{Decoder, Encoder}; use mithril_common::logging::LoggerExtensions; use mithril_common::StdResult; +use super::{ + appender::{AppenderDirAll, AppenderEntries, TarAppender}, + OngoingSnapshot, SnapshotError, Snapshotter, SnapshotterCompressionAlgorithm, +}; use crate::dependency_injection::DependenciesBuilderError; -use crate::ZstandardCompressionParameters; -#[cfg_attr(test, mockall::automock)] -/// Define the ability to create snapshots. -pub trait Snapshotter: Sync + Send { - /// Create a new snapshot with the given filepath. - fn snapshot_all(&self, filepath: &Path) -> StdResult; - - /// Create a new snapshot with the given filepath from a subset of directories and files. - fn snapshot_subset(&self, filepath: &Path, files: Vec) -> StdResult; - - /// Check if the snapshot exists. - fn does_snapshot_exist(&self, filepath: &Path) -> bool; - - /// Give the full target path for the filepath. - fn get_file_path(&self, filepath: &Path) -> PathBuf; -} - -/// Compression algorithm and parameters of the [CompressedArchiveSnapshotter]. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum SnapshotterCompressionAlgorithm { - /// Gzip compression format - Gzip, - /// Zstandard compression format - Zstandard(ZstandardCompressionParameters), -} - -impl From for SnapshotterCompressionAlgorithm { - fn from(params: ZstandardCompressionParameters) -> Self { - Self::Zstandard(params) - } -} - -/// Define multiple ways to append content to a tar archive. -trait TarAppender { - fn append(&self, tar: &mut tar::Builder) -> StdResult<()>; -} - -struct AppenderDirAll { - db_directory: PathBuf, -} - -impl TarAppender for AppenderDirAll { - fn append(&self, tar: &mut tar::Builder) -> StdResult<()> { - tar.append_dir_all(".", &self.db_directory) - .map_err(SnapshotError::CreateArchiveError) - .with_context(|| { - format!( - "Can not add directory: '{}' to the archive", - self.db_directory.display() - ) - })?; - Ok(()) - } -} - -struct AppenderEntries { - entries: Vec, - db_directory: PathBuf, -} - -impl TarAppender for AppenderEntries { - fn append(&self, tar: &mut tar::Builder) -> StdResult<()> { - for entry in &self.entries { - let entry_path = self.db_directory.join(entry); - if entry_path.is_dir() { - tar.append_dir_all(entry, entry_path.clone()) - .with_context(|| { - format!( - "Can not add directory: '{}' to the archive", - entry_path.display() - ) - })?; - } else if entry_path.is_file() { - let mut file = File::open(entry_path.clone())?; - tar.append_file(entry, &mut file).with_context(|| { - format!( - "Can not add file: '{}' to the archive", - entry_path.display() - ) - })?; - } else { - return Err(anyhow!( - "The entry: '{}' is not valid", - entry_path.display() - )); - } - } - Ok(()) - } -} /// Compressed Archive Snapshotter create a compressed file. pub struct CompressedArchiveSnapshotter { /// DB directory to snapshot @@ -122,50 +36,6 @@ pub struct CompressedArchiveSnapshotter { logger: Logger, } -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct OngoingSnapshot { - filepath: PathBuf, - filesize: u64, -} - -impl OngoingSnapshot { - pub fn new(filepath: PathBuf, filesize: u64) -> Self { - Self { filepath, filesize } - } - - pub fn get_file_path(&self) -> &PathBuf { - &self.filepath - } - - pub fn get_file_size(&self) -> &u64 { - &self.filesize - } -} - -/// Snapshotter error type. -#[derive(Error, Debug)] -pub enum SnapshotError { - /// Set when the snapshotter fails at creating a snapshot. - #[error("Create archive error: {0}")] - CreateArchiveError(#[from] io::Error), - - /// Set when the snapshotter creates an invalid snapshot. - #[error("Invalid archive error: {0}")] - InvalidArchiveError(String), - - /// Set when the snapshotter fails verifying a snapshot. - #[error("Archive verification error: {0}")] - VerifyArchiveError(String), - - /// Set when the snapshotter fails at uploading the snapshot. - #[error("Upload file error: `{0}`")] - UploadFileError(String), - - /// General error. - #[error("Snapshot General Error: `{0}`")] - GeneralError(String), -} - impl Snapshotter for CompressedArchiveSnapshotter { fn snapshot_all(&self, filepath: &Path) -> StdResult { let appender = AppenderDirAll { @@ -463,159 +333,18 @@ impl CompressedArchiveSnapshotter { } } -/// Snapshotter that does nothing. It is mainly used for test purposes. -pub struct DumbSnapshotter { - last_snapshot: RwLock>, -} - -impl DumbSnapshotter { - /// Create a new instance of DumbSnapshotter. - pub fn new() -> Self { - Self { - last_snapshot: RwLock::new(None), - } - } - - /// Return the last fake snapshot produced. - pub fn get_last_snapshot(&self) -> StdResult> { - let value = self - .last_snapshot - .read() - .map_err(|e| SnapshotError::UploadFileError(e.to_string()))? - .as_ref() - .cloned(); - - Ok(value) - } -} - -impl Default for DumbSnapshotter { - fn default() -> Self { - Self::new() - } -} - -impl Snapshotter for DumbSnapshotter { - fn snapshot_all(&self, archive_name: &Path) -> StdResult { - let mut value = self - .last_snapshot - .write() - .map_err(|e| SnapshotError::UploadFileError(e.to_string()))?; - let snapshot = OngoingSnapshot { - filepath: self.get_file_path(archive_name), - filesize: 0, - }; - *value = Some(snapshot.clone()); - - Ok(snapshot) - } - - fn snapshot_subset( - &self, - archive_name: &Path, - _files: Vec, - ) -> StdResult { - self.snapshot_all(archive_name) - } - - fn does_snapshot_exist(&self, _filepath: &Path) -> bool { - false - } - - fn get_file_path(&self, filepath: &Path) -> PathBuf { - filepath.to_path_buf() - } -} - #[cfg(test)] mod tests { use std::sync::Arc; - use uuid::Uuid; - - use mithril_common::{digesters::DummyCardanoDbBuilder, test_utils::TempDir}; + use mithril_common::digesters::DummyCardanoDbBuilder; + use crate::services::snapshotter::test_tools::*; use crate::test_tools::TestLogger; + use crate::ZstandardCompressionParameters; use super::*; - fn get_test_directory(dir_name: &str) -> PathBuf { - TempDir::create("snapshotter", dir_name) - } - - fn create_file(root: &Path, filename: &str) -> PathBuf { - let file_path = PathBuf::from(filename); - File::create(root.join(file_path.clone())).unwrap(); - file_path - } - - fn create_dir(root: &Path, dirname: &str) -> PathBuf { - let dir_path = PathBuf::from(dirname); - std::fs::create_dir(root.join(dir_path.clone())).unwrap(); - dir_path - } - - fn unpack_gz_decoder(test_dir: PathBuf, snapshot: OngoingSnapshot) -> PathBuf { - let file_tar_gz = File::open(snapshot.get_file_path()).unwrap(); - let file_tar_gz_decoder = GzDecoder::new(file_tar_gz); - let mut archive = Archive::new(file_tar_gz_decoder); - let unpack_path = test_dir.join(create_dir(&test_dir, "unpack")); - archive.unpack(&unpack_path).unwrap(); - - unpack_path - } - - // Generate unique name for the archive is mandatory to avoid conflicts during the verification. - fn random_archive_name() -> String { - format!("{}.tar.gz", Uuid::new_v4()) - } - - #[test] - fn test_dumb_snapshotter_snapshot_return_archive_name_with_size_0() { - let snapshotter = DumbSnapshotter::new(); - let snapshot = snapshotter - .snapshot_all(Path::new("archive.tar.gz")) - .unwrap(); - - assert_eq!(PathBuf::from("archive.tar.gz"), *snapshot.get_file_path()); - assert_eq!(0, *snapshot.get_file_size()); - - let snapshot = snapshotter - .snapshot_subset(Path::new("archive.tar.gz"), vec![PathBuf::from("whatever")]) - .unwrap(); - assert_eq!(PathBuf::from("archive.tar.gz"), *snapshot.get_file_path()); - assert_eq!(0, *snapshot.get_file_size()); - } - - #[test] - fn test_dumb_snapshotter() { - let snapshotter = DumbSnapshotter::new(); - assert!(snapshotter - .get_last_snapshot() - .expect("Dumb snapshotter::get_last_snapshot should not fail when no last snapshot.") - .is_none()); - - let snapshot = snapshotter - .snapshot_all(Path::new("whatever")) - .expect("Dumb snapshotter::snapshot should not fail."); - assert_eq!( - Some(snapshot), - snapshotter.get_last_snapshot().expect( - "Dumb snapshotter::get_last_snapshot should not fail when some last snapshot." - ) - ); - - let snapshot = snapshotter - .snapshot_subset(Path::new("another_whatever"), vec![PathBuf::from("subdir")]) - .expect("Dumb snapshotter::snapshot should not fail."); - assert_eq!( - Some(snapshot), - snapshotter.get_last_snapshot().expect( - "Dumb snapshotter::get_last_snapshot should not fail when some last snapshot." - ) - ); - } - #[test] fn should_create_directory_if_does_not_exist() { let test_dir = get_test_directory("should_create_directory_if_does_not_exist"); @@ -772,119 +501,6 @@ mod tests { .expect("Snapshotter::snapshot should not fail."); } - #[test] - fn snapshot_subset_should_create_archive_only_for_specified_directories_and_files() { - let test_dir = get_test_directory("only_for_specified_directories_and_files"); - let destination = test_dir.join(create_dir(&test_dir, "destination")); - let source = test_dir.join(create_dir(&test_dir, "source")); - - let directory_to_archive_path = create_dir(&source, "directory_to_archive"); - let file_to_archive_path = create_file(&source, "file_to_archive.txt"); - let directory_not_to_archive_path = create_dir(&source, "directory_not_to_archive"); - let file_not_to_archive_path = create_file(&source, "file_not_to_archive.txt"); - - let snapshotter = CompressedArchiveSnapshotter::new( - source, - destination, - SnapshotterCompressionAlgorithm::Gzip, - TestLogger::stdout(), - ) - .unwrap(); - - let snapshot = snapshotter - .snapshot_subset( - Path::new(&random_archive_name()), - vec![ - directory_to_archive_path.clone(), - file_to_archive_path.clone(), - ], - ) - .unwrap(); - - let unpack_path = unpack_gz_decoder(test_dir, snapshot); - - assert!(unpack_path.join(directory_to_archive_path).is_dir()); - assert!(unpack_path.join(file_to_archive_path).is_file()); - assert!(!unpack_path.join(directory_not_to_archive_path).exists()); - assert!(!unpack_path.join(file_not_to_archive_path).exists()); - } - - #[test] - fn snapshot_subset_return_error_when_file_or_directory_not_exist() { - let test_dir = get_test_directory("file_or_directory_not_exist"); - let destination = test_dir.join(create_dir(&test_dir, "destination")); - let source = test_dir.join(create_dir(&test_dir, "source")); - - let snapshotter = CompressedArchiveSnapshotter::new( - source, - destination, - SnapshotterCompressionAlgorithm::Gzip, - TestLogger::stdout(), - ) - .unwrap(); - - snapshotter - .snapshot_subset( - Path::new(&random_archive_name()), - vec![PathBuf::from("not_exist")], - ) - .expect_err("snapshot_subset should return error when file or directory not exist"); - } - - #[test] - fn snapshot_subset_return_error_when_empty_entries() { - let test_dir = get_test_directory("empty_entries"); - let destination = test_dir.join(create_dir(&test_dir, "destination")); - let source = test_dir.join(create_dir(&test_dir, "source")); - - let snapshotter = CompressedArchiveSnapshotter::new( - source, - destination, - SnapshotterCompressionAlgorithm::Gzip, - TestLogger::stdout(), - ) - .unwrap(); - - snapshotter - .snapshot_subset(Path::new(&random_archive_name()), vec![]) - .expect_err("snapshot_subset should return error when entries is empty"); - } - - #[test] - fn snapshot_subset_with_duplicate_files_and_directories() { - let test_dir = get_test_directory("with_duplicate_files_and_directories"); - let destination = test_dir.join(create_dir(&test_dir, "destination")); - let source = test_dir.join(create_dir(&test_dir, "source")); - - let directory_to_archive_path = create_dir(&source, "directory_to_archive"); - let file_to_archive_path = create_file(&source, "directory_to_archive/file_to_archive.txt"); - - let snapshotter = CompressedArchiveSnapshotter::new( - source, - destination, - SnapshotterCompressionAlgorithm::Gzip, - TestLogger::stdout(), - ) - .unwrap(); - - let snapshot = snapshotter - .snapshot_subset( - Path::new(&random_archive_name()), - vec![ - directory_to_archive_path.clone(), - directory_to_archive_path.clone(), - file_to_archive_path.clone(), - file_to_archive_path.clone(), - ], - ) - .unwrap(); - - let unpack_path = unpack_gz_decoder(test_dir, snapshot); - - assert!(unpack_path.join(directory_to_archive_path).is_dir()); - assert!(unpack_path.join(file_to_archive_path).is_file()); - } - #[test] fn snapshot_overwrite_archive_already_existing() { let test_dir = get_test_directory("snapshot_overwrite_archive_already_existing"); diff --git a/mithril-aggregator/src/services/snapshotter/dumb_snapshotter.rs b/mithril-aggregator/src/services/snapshotter/dumb_snapshotter.rs new file mode 100644 index 00000000000..eb8bc856899 --- /dev/null +++ b/mithril-aggregator/src/services/snapshotter/dumb_snapshotter.rs @@ -0,0 +1,121 @@ +use std::path::{Path, PathBuf}; +use std::sync::RwLock; + +use mithril_common::StdResult; + +use crate::services::{OngoingSnapshot, SnapshotError, Snapshotter}; + +/// Snapshotter that does nothing. It is mainly used for test purposes. +pub struct DumbSnapshotter { + last_snapshot: RwLock>, +} + +impl DumbSnapshotter { + /// Create a new instance of DumbSnapshotter. + pub fn new() -> Self { + Self { + last_snapshot: RwLock::new(None), + } + } + + /// Return the last fake snapshot produced. + pub fn get_last_snapshot(&self) -> StdResult> { + let value = self + .last_snapshot + .read() + .map_err(|e| SnapshotError::UploadFileError(e.to_string()))? + .as_ref() + .cloned(); + + Ok(value) + } +} + +impl Default for DumbSnapshotter { + fn default() -> Self { + Self::new() + } +} + +impl Snapshotter for DumbSnapshotter { + fn snapshot_all(&self, archive_name: &Path) -> StdResult { + let mut value = self + .last_snapshot + .write() + .map_err(|e| SnapshotError::UploadFileError(e.to_string()))?; + let snapshot = OngoingSnapshot { + filepath: archive_name.to_path_buf(), + filesize: 0, + }; + *value = Some(snapshot.clone()); + + Ok(snapshot) + } + + fn snapshot_subset( + &self, + archive_name: &Path, + _files: Vec, + ) -> StdResult { + self.snapshot_all(archive_name) + } + + fn does_snapshot_exist(&self, _filepath: &Path) -> bool { + false + } + + fn get_file_path(&self, filepath: &Path) -> PathBuf { + filepath.to_path_buf() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_dumb_snapshotter_snapshot_return_archive_name_with_size_0() { + let snapshotter = DumbSnapshotter::new(); + let snapshot = snapshotter + .snapshot_all(Path::new("archive.tar.gz")) + .unwrap(); + + assert_eq!(PathBuf::from("archive.tar.gz"), *snapshot.get_file_path()); + assert_eq!(0, *snapshot.get_file_size()); + + let snapshot = snapshotter + .snapshot_subset(Path::new("archive.tar.gz"), vec![PathBuf::from("whatever")]) + .unwrap(); + assert_eq!(PathBuf::from("archive.tar.gz"), *snapshot.get_file_path()); + assert_eq!(0, *snapshot.get_file_size()); + } + + #[test] + fn test_dumb_snapshotter() { + let snapshotter = DumbSnapshotter::new(); + assert!(snapshotter + .get_last_snapshot() + .expect("Dumb snapshotter::get_last_snapshot should not fail when no last snapshot.") + .is_none()); + + let snapshot = snapshotter + .snapshot_all(Path::new("whatever")) + .expect("Dumb snapshotter::snapshot should not fail."); + assert_eq!( + Some(snapshot), + snapshotter.get_last_snapshot().expect( + "Dumb snapshotter::get_last_snapshot should not fail when some last snapshot." + ) + ); + + let snapshot = snapshotter + .snapshot_subset(Path::new("another_whatever"), vec![PathBuf::from("subdir")]) + .expect("Dumb snapshotter::snapshot should not fail."); + assert_eq!( + Some(snapshot), + snapshotter.get_last_snapshot().expect( + "Dumb snapshotter::get_last_snapshot should not fail when some last snapshot." + ) + ); + } +} diff --git a/mithril-aggregator/src/services/snapshotter/interface.rs b/mithril-aggregator/src/services/snapshotter/interface.rs new file mode 100644 index 00000000000..afe19878e52 --- /dev/null +++ b/mithril-aggregator/src/services/snapshotter/interface.rs @@ -0,0 +1,86 @@ +use std::io; +use std::path::{Path, PathBuf}; +use thiserror::Error; + +use mithril_common::StdResult; + +use crate::ZstandardCompressionParameters; + +#[cfg_attr(test, mockall::automock)] +/// Define the ability to create snapshots. +pub trait Snapshotter: Sync + Send { + /// Create a new snapshot with the given filepath. + fn snapshot_all(&self, filepath: &Path) -> StdResult; + + /// Create a new snapshot with the given filepath from a subset of directories and files. + fn snapshot_subset(&self, filepath: &Path, files: Vec) -> StdResult; + + /// Check if the snapshot exists. + fn does_snapshot_exist(&self, filepath: &Path) -> bool; + + /// Give the full target path for the filepath. + fn get_file_path(&self, filepath: &Path) -> PathBuf; +} + +/// Snapshotter error type. +#[derive(Error, Debug)] +pub enum SnapshotError { + /// Set when the snapshotter fails at creating a snapshot. + #[error("Create archive error: {0}")] + CreateArchiveError(#[from] io::Error), + + /// Set when the snapshotter creates an invalid snapshot. + #[error("Invalid archive error: {0}")] + InvalidArchiveError(String), + + /// Set when the snapshotter fails verifying a snapshot. + #[error("Archive verification error: {0}")] + VerifyArchiveError(String), + + /// Set when the snapshotter fails at uploading the snapshot. + #[error("Upload file error: `{0}`")] + UploadFileError(String), + + /// General error. + #[error("Snapshot General Error: `{0}`")] + GeneralError(String), +} + +/// Compression algorithm and parameters of the [crate::services::CompressedArchiveSnapshotter]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SnapshotterCompressionAlgorithm { + /// Gzip compression format + Gzip, + /// Zstandard compression format + Zstandard(ZstandardCompressionParameters), +} + +impl From for SnapshotterCompressionAlgorithm { + fn from(params: ZstandardCompressionParameters) -> Self { + Self::Zstandard(params) + } +} + +/// An ongoing snapshot is a snapshot that is not yet uploaded. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct OngoingSnapshot { + pub(super) filepath: PathBuf, + pub(super) filesize: u64, +} + +impl OngoingSnapshot { + /// `OngoingSnapshot` factory + pub fn new(filepath: PathBuf, filesize: u64) -> Self { + Self { filepath, filesize } + } + + /// Get the path of the snapshot archive. + pub fn get_file_path(&self) -> &PathBuf { + &self.filepath + } + + /// Get the size of the snapshot archive. + pub fn get_file_size(&self) -> &u64 { + &self.filesize + } +} diff --git a/mithril-aggregator/src/services/snapshotter/mod.rs b/mithril-aggregator/src/services/snapshotter/mod.rs new file mode 100644 index 00000000000..329c5739b81 --- /dev/null +++ b/mithril-aggregator/src/services/snapshotter/mod.rs @@ -0,0 +1,52 @@ +mod appender; +mod compressed_archive_snapshotter; +mod dumb_snapshotter; +mod interface; + +pub use compressed_archive_snapshotter::*; +pub use dumb_snapshotter::*; +pub use interface::*; + +#[cfg(test)] +pub(crate) mod test_tools { + use flate2::read::GzDecoder; + use std::fs::File; + use std::path::{Path, PathBuf}; + use tar::Archive; + use uuid::Uuid; + + use mithril_common::test_utils::TempDir; + + use crate::services::OngoingSnapshot; + + pub fn get_test_directory(dir_name: &str) -> PathBuf { + TempDir::create("snapshotter", dir_name) + } + + pub fn create_file(root: &Path, filename: &str) -> PathBuf { + let file_path = PathBuf::from(filename); + File::create(root.join(file_path.clone())).unwrap(); + file_path + } + + pub fn create_dir(root: &Path, dirname: &str) -> PathBuf { + let dir_path = PathBuf::from(dirname); + std::fs::create_dir(root.join(dir_path.clone())).unwrap(); + dir_path + } + + // Generate unique name for the archive is mandatory to avoid conflicts during the verification. + pub fn random_archive_name() -> String { + format!("{}.tar.gz", Uuid::new_v4()) + } + + pub fn unpack_gz_decoder(test_dir: PathBuf, snapshot: OngoingSnapshot) -> PathBuf { + let file_tar_gz = File::open(snapshot.get_file_path()).unwrap(); + let file_tar_gz_decoder = GzDecoder::new(file_tar_gz); + let mut archive = Archive::new(file_tar_gz_decoder); + let unpack_path = test_dir.join(create_dir(&test_dir, "unpack")); + archive.unpack(&unpack_path).unwrap(); + + unpack_path + } +} diff --git a/mithril-aggregator/tests/test_extensions/runtime_tester.rs b/mithril-aggregator/tests/test_extensions/runtime_tester.rs index 4c707632a91..3b22296bf86 100644 --- a/mithril-aggregator/tests/test_extensions/runtime_tester.rs +++ b/mithril-aggregator/tests/test_extensions/runtime_tester.rs @@ -7,8 +7,8 @@ use mithril_aggregator::{ database::{record::SignedEntityRecord, repository::OpenMessageRepository}, dependency_injection::DependenciesBuilder, event_store::EventMessage, - AggregatorRuntime, Configuration, DependencyContainer, DumbSnapshotter, DumbUploader, - SignerRegistrationError, + services::DumbSnapshotter, + AggregatorRuntime, Configuration, DependencyContainer, DumbUploader, SignerRegistrationError, }; use mithril_common::{ cardano_block_scanner::{DumbBlockScanner, ScannedBlock},