Skip to content

Commit

Permalink
Merge pull request #2251 from grumbach/improved_archives
Browse files Browse the repository at this point in the history
feat: improved archives with metadata
  • Loading branch information
grumbach authored Oct 17, 2024
2 parents 6c4ad11 + f7a9d4a commit 7e1027d
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 40 deletions.
6 changes: 3 additions & 3 deletions autonomi-cli/src/actions/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ pub async fn download(addr: &str, dest_path: &str, client: &mut Client) -> Resul
.await
.wrap_err("Failed to fetch data from address")?;

let progress_bar = get_progress_bar(archive.map.len() as u64)?;
let progress_bar = get_progress_bar(archive.iter().count() as u64)?;
let mut all_errs = vec![];
for (path, addr) in archive.map {
for (path, addr, _meta) in archive.iter() {
progress_bar.println(format!("Fetching file: {path:?}..."));
let bytes = match client.data_get(addr).await {
let bytes = match client.data_get(*addr).await {
Ok(bytes) => bytes,
Err(e) => {
let err = format!("Failed to fetch file {path:?}: {e}");
Expand Down
112 changes: 110 additions & 2 deletions autonomi/src/client/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use std::{collections::HashMap, path::PathBuf};
use std::{
collections::HashMap,
path::{Path, PathBuf},
};

use sn_networking::target_arch::{Duration, SystemTime, UNIX_EPOCH};

use super::{
data::DataAddr,
Expand All @@ -21,11 +26,46 @@ use xor_name::XorName;
/// The address of an archive on the network. Points to an [`Archive`].
pub type ArchiveAddr = XorName;

use thiserror::Error;

#[derive(Error, Debug, PartialEq, Eq)]
pub enum RenameError {
#[error("File not found in archive: {0}")]
FileNotFound(PathBuf),
}

/// An archive of files that containing file paths, their metadata and the files data addresses
/// Using archives is useful for uploading entire directories to the network, only needing to keep track of a single address.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Archive {
pub map: HashMap<PathBuf, DataAddr>,
map: HashMap<PathBuf, (DataAddr, Metadata)>,
}

/// Metadata for a file in an archive
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Metadata {
pub created: u64,
pub modified: u64,
}

impl Metadata {
/// Create a new metadata struct
pub fn new() -> Self {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::from_secs(0))
.as_secs();
Self {
created: now,
modified: now,
}
}
}

impl Default for Metadata {
fn default() -> Self {
Self::new()
}
}

impl Archive {
Expand All @@ -43,6 +83,74 @@ impl Archive {

Ok(root_serialized)
}

/// Rename a file in an archive
/// Note that this does not upload the archive to the network
pub fn rename_file(&mut self, old_path: &Path, new_path: &Path) -> Result<(), RenameError> {
let (data_addr, mut meta) = self
.map
.remove(old_path)
.ok_or(RenameError::FileNotFound(old_path.to_path_buf()))?;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::from_secs(0))
.as_secs();
meta.modified = now;
self.map.insert(new_path.to_path_buf(), (data_addr, meta));
Ok(())
}

/// Create a new emtpy local archive
/// Note that this does not upload the archive to the network
pub fn new() -> Self {
Self {
map: HashMap::new(),
}
}

/// Add a file to a local archive
/// Note that this does not upload the archive to the network
pub fn add_file(&mut self, path: PathBuf, data_addr: DataAddr, meta: Metadata) {
self.map.insert(path, (data_addr, meta));
}

/// Add a file to a local archive, with default metadata
/// Note that this does not upload the archive to the network
pub fn add_new_file(&mut self, path: PathBuf, data_addr: DataAddr) {
self.map.insert(path, (data_addr, Metadata::new()));
}

/// List all files in the archive
pub fn files(&self) -> Vec<(PathBuf, Metadata)> {
self.map
.iter()
.map(|(path, (_, meta))| (path.clone(), meta.clone()))
.collect()
}

/// List all data addresses of the files in the archive
pub fn addresses(&self) -> Vec<DataAddr> {
self.map.values().map(|(addr, _)| *addr).collect()
}

/// Iterate over the archive items
/// Returns an iterator over (PathBuf, DataAddr, Metadata)
pub fn iter(&self) -> impl Iterator<Item = (&PathBuf, &DataAddr, &Metadata)> {
self.map
.iter()
.map(|(path, (addr, meta))| (path, addr, meta))
}

/// Get the underlying map
pub fn map(&self) -> &HashMap<PathBuf, (DataAddr, Metadata)> {
&self.map
}
}

impl Default for Archive {
fn default() -> Self {
Self::new()
}
}

impl Client {
Expand Down
23 changes: 16 additions & 7 deletions autonomi/src/client/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,12 @@ pub enum PutError {
/// Errors that can occur during the pay operation.
#[derive(Debug, thiserror::Error)]
pub enum PayError {
#[error("Could not get store quote for: {0:?} after several retries")]
CouldNotGetStoreQuote(XorName),
#[error("Could not get store costs: {0:?}")]
CouldNotGetStoreCosts(NetworkError),
#[error("Could not simultaneously fetch store costs: {0:?}")]
JoinError(JoinError),
#[error("Wallet error: {0:?}")]
EvmWalletError(#[from] EvmWalletError),
#[error("Failed to self-encrypt data.")]
SelfEncryption(#[from] crate::self_encryption::Error),
#[error("Cost error: {0:?}")]
Cost(#[from] CostError),
}

/// Errors that can occur during the get operation.
Expand All @@ -75,6 +71,19 @@ pub enum GetError {
Protocol(#[from] sn_protocol::Error),
}

/// Errors that can occur during the cost calculation.
#[derive(Debug, thiserror::Error)]
pub enum CostError {
#[error("Could not simultaneously fetch store costs: {0:?}")]
JoinError(JoinError),
#[error("Failed to self-encrypt data.")]
SelfEncryption(#[from] crate::self_encryption::Error),
#[error("Could not get store quote for: {0:?} after several retries")]
CouldNotGetStoreQuote(XorName),
#[error("Could not get store costs: {0:?}")]
CouldNotGetStoreCosts(NetworkError),
}

impl Client {
/// Fetch a blob of data from the network
pub async fn data_get(&self, addr: DataAddr) -> Result<Bytes, GetError> {
Expand Down Expand Up @@ -184,7 +193,7 @@ impl Client {
}

/// Get the estimated cost of storing a piece of data.
pub async fn data_cost(&self, data: Bytes) -> Result<AttoTokens, PayError> {
pub async fn data_cost(&self, data: Bytes) -> Result<AttoTokens, CostError> {
let now = sn_networking::target_arch::Instant::now();
let (data_map_chunk, chunks) = encrypt(data)?;

Expand Down
46 changes: 29 additions & 17 deletions autonomi/src/client/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::client::archive::Metadata;
use crate::client::data::CostError;
use crate::client::Client;
use bytes::Bytes;
use sn_evm::EvmWallet;
use std::collections::HashMap;
use std::path::PathBuf;

use super::archive::{Archive, ArchiveAddr};
Expand Down Expand Up @@ -43,6 +44,22 @@ pub enum DownloadError {
IoError(#[from] std::io::Error),
}

#[cfg(feature = "fs")]
/// Errors that can occur during the file cost calculation.
#[derive(Debug, thiserror::Error)]
pub enum FileCostError {
#[error("Cost error: {0}")]
Cost(#[from] CostError),
#[error("IO failure")]
IoError(#[from] std::io::Error),
#[error("Serialization error")]
Serialization(#[from] rmp_serde::encode::Error),
#[error("Self encryption error")]
SelfEncryption(#[from] crate::self_encryption::Error),
#[error("Walkdir error")]
WalkDir(#[from] walkdir::Error),
}

impl Client {
/// Download file from network to local file system
pub async fn file_download(
Expand All @@ -65,8 +82,8 @@ impl Client {
to_dest: PathBuf,
) -> Result<(), DownloadError> {
let archive = self.archive_get(archive_addr).await?;
for (path, addr) in archive.map {
self.file_download(addr, to_dest.join(path)).await?;
for (path, addr, _meta) in archive.iter() {
self.file_download(*addr, to_dest.join(path)).await?;
}
Ok(())
}
Expand All @@ -78,7 +95,7 @@ impl Client {
dir_path: PathBuf,
wallet: &EvmWallet,
) -> Result<ArchiveAddr, UploadError> {
let mut map = HashMap::new();
let mut archive = Archive::new();

for entry in walkdir::WalkDir::new(dir_path) {
let entry = entry?;
Expand All @@ -93,10 +110,9 @@ impl Client {
println!("Uploading file: {path:?}");
let file = self.file_upload(path.clone(), wallet).await?;

map.insert(path, file);
archive.add_file(path, file, Metadata::new());
}

let archive = Archive { map };
let archive_serialized = archive.into_bytes()?;

let arch_addr = self.data_put(archive_serialized, wallet).await?;
Expand All @@ -119,8 +135,8 @@ impl Client {

/// Get the cost to upload a file/dir to the network.
/// quick and dirty implementation, please refactor once files are cleanly implemented
pub async fn file_cost(&self, path: &PathBuf) -> Result<sn_evm::AttoTokens, UploadError> {
let mut map = HashMap::new();
pub async fn file_cost(&self, path: &PathBuf) -> Result<sn_evm::AttoTokens, FileCostError> {
let mut archive = Archive::new();
let mut total_cost = sn_evm::Amount::ZERO;

for entry in walkdir::WalkDir::new(path) {
Expand All @@ -135,27 +151,23 @@ impl Client {

let data = tokio::fs::read(&path).await?;
let file_bytes = Bytes::from(data);
let file_cost = self.data_cost(file_bytes.clone()).await.expect("TODO");
let file_cost = self.data_cost(file_bytes.clone()).await?;

total_cost += file_cost.as_atto();

// re-do encryption to get the correct map xorname here
// this code needs refactor
let now = sn_networking::target_arch::Instant::now();
let (data_map_chunk, _) = crate::self_encryption::encrypt(file_bytes).expect("TODO");
let (data_map_chunk, _) = crate::self_encryption::encrypt(file_bytes)?;
tracing::debug!("Encryption took: {:.2?}", now.elapsed());
let map_xor_name = *data_map_chunk.address().xorname();

map.insert(path, map_xor_name);
archive.add_file(path, map_xor_name, Metadata::new());
}

let root = Archive { map };
let root_serialized = rmp_serde::to_vec(&root).expect("TODO");
let root_serialized = rmp_serde::to_vec(&archive)?;

let archive_cost = self
.data_cost(Bytes::from(root_serialized))
.await
.expect("TODO");
let archive_cost = self.data_cost(Bytes::from(root_serialized)).await?;

total_cost += archive_cost.as_atto();
Ok(total_cost.into())
Expand Down
4 changes: 4 additions & 0 deletions autonomi/src/client/registers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ use sn_registers::{Permissions, RegisterCrdt, RegisterOp, SignedRegister};
use std::collections::BTreeSet;
use xor_name::XorName;

use super::data::CostError;

#[derive(Debug, thiserror::Error)]
pub enum RegisterError {
#[error("Cost error: {0}")]
Cost(#[from] CostError),
#[error("Network error")]
Network(#[from] NetworkError),
#[error("Serialization error")]
Expand Down
8 changes: 4 additions & 4 deletions autonomi/src/client/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use xor_name::XorName;
use crate::self_encryption::DataMapLevel;

use super::{
data::{GetError, PayError, PutError},
data::{CostError, GetError, PayError, PutError},
Client,
};

Expand Down Expand Up @@ -177,7 +177,7 @@ impl Client {
pub(crate) async fn get_store_quotes(
&self,
content_addrs: impl Iterator<Item = XorName>,
) -> Result<HashMap<XorName, PayeeQuote>, PayError> {
) -> Result<HashMap<XorName, PayeeQuote>, CostError> {
let futures: Vec<_> = content_addrs
.into_iter()
.map(|content_addr| fetch_store_quote_with_retries(&self.network, content_addr))
Expand All @@ -193,7 +193,7 @@ impl Client {
async fn fetch_store_quote_with_retries(
network: &Network,
content_addr: XorName,
) -> Result<(XorName, PayeeQuote), PayError> {
) -> Result<(XorName, PayeeQuote), CostError> {
let mut retries = 0;

loop {
Expand All @@ -209,7 +209,7 @@ async fn fetch_store_quote_with_retries(
error!(
"Error while fetching store quote: {err:?}, stopping after {retries} retries"
);
break Err(PayError::CouldNotGetStoreQuote(content_addr));
break Err(CostError::CouldNotGetStoreQuote(content_addr));
}
}
}
Expand Down
11 changes: 8 additions & 3 deletions autonomi/src/client/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl JsClient {

mod archive {
use super::*;
use crate::client::archive::Metadata;
use crate::client::{address::str_to_addr, archive::Archive};
use std::{collections::HashMap, path::PathBuf};
use xor_name::XorName;
Expand All @@ -82,7 +83,7 @@ mod archive {
let data = self.0.archive_get(addr).await?;

// To `Map<K, V>` (JS)
let data = serde_wasm_bindgen::to_value(&data.map)?;
let data = serde_wasm_bindgen::to_value(&data.map())?;
Ok(data.into())
}

Expand All @@ -93,8 +94,12 @@ mod archive {
wallet: &JsWallet,
) -> Result<String, JsError> {
// From `Map<K, V>` or `Iterable<[K, V]>` (JS)
let map: HashMap<PathBuf, XorName> = serde_wasm_bindgen::from_value(map)?;
let archive = Archive { map };
let map: HashMap<PathBuf, (XorName, Metadata)> = serde_wasm_bindgen::from_value(map)?;
let mut archive = Archive::new();

for (path, (xorname, meta)) in map {
archive.add_file(path, xorname, meta);
}

let addr = self.0.archive_put(archive, &wallet.0).await?;

Expand Down
Loading

0 comments on commit 7e1027d

Please sign in to comment.