diff --git a/autonomi/src/client/data.rs b/autonomi/src/client/data.rs index 9ee4559c20..167a4bd872 100644 --- a/autonomi/src/client/data.rs +++ b/autonomi/src/client/data.rs @@ -1,16 +1,28 @@ -use crate::client::{Client, ClientWrapper}; use crate::self_encryption::DataMapLevel; use bytes::Bytes; use evmlib::wallet; -use libp2p::kad::Quorum; +use libp2p::kad::{Quorum, Record}; + use self_encryption::{decrypt_full_set, DataMap, EncryptedChunk}; -use sn_networking::{GetRecordCfg, NetworkError}; -use sn_protocol::storage::{try_deserialize_record, Chunk, ChunkAddress, RecordHeader, RecordKind}; -use sn_protocol::NetworkAddress; use std::collections::HashSet; use tokio::task::JoinError; use xor_name::XorName; +use crate::{self_encryption::encrypt, Client}; +use evmlib::common::{QuoteHash, QuotePayment, TxHash}; +use evmlib::wallet::Wallet; +use libp2p::futures; +use sn_evm::ProofOfPayment; +use sn_networking::PutRecordCfg; +use sn_networking::{GetRecordCfg, Network, NetworkError, PayeeQuote}; +use sn_protocol::{ + storage::{ + try_deserialize_record, try_serialize_record, Chunk, ChunkAddress, RecordHeader, RecordKind, + }, + NetworkAddress, +}; +use std::collections::{BTreeMap, HashMap}; + /// Errors that can occur during the put operation. #[derive(Debug, thiserror::Error)] pub enum PutError { @@ -22,14 +34,18 @@ pub enum PutError { VaultXorName, #[error("A network error occurred.")] Network(#[from] NetworkError), + #[error("Error occurred during payment.")] + PayError(#[from] PayError), + + // native token #[cfg(feature = "native-payments")] #[error("A wallet error occurred.")] Wallet(#[from] sn_transfers::WalletError), + + // evm token #[cfg(feature = "evm-payments")] #[error("A wallet error occurred.")] - EvmWallet(#[from] sn_evm::EvmError), - #[error("Error occurred during payment.")] - PayError(#[from] PayError), + Wallet(#[from] sn_evm::EvmError), } /// Errors that can occur during the pay operation. @@ -144,24 +160,198 @@ impl Client { }; } } -} -pub trait Data: ClientWrapper { - async fn get(&self, data_map_addr: XorName) -> Result { - self.client().get(data_map_addr).await + /// Upload a piece of data to the network. This data will be self-encrypted, + /// and the data map XOR address will be returned. + pub async fn put(&mut self, data: Bytes, wallet: &Wallet) -> Result { + let now = std::time::Instant::now(); + let (data_map_chunk, chunks) = encrypt(data)?; + + tracing::debug!("Encryption took: {:.2?}", now.elapsed()); + + let map_xor_name = *data_map_chunk.address().xorname(); + let mut xor_names = vec![map_xor_name]; + + for chunk in &chunks { + xor_names.push(*chunk.name()); + } + + // Pay for all chunks + data map chunk + let (payment_proofs, _free_chunks) = self.pay(xor_names.into_iter(), wallet).await?; + + // Upload data map + if let Some(proof) = payment_proofs.get(&map_xor_name) { + self.upload_chunk(data_map_chunk.clone(), proof.clone()) + .await?; + } + + // Upload the rest of the chunks + for chunk in chunks { + if let Some(proof) = payment_proofs.get(chunk.name()) { + self.upload_chunk(chunk, proof.clone()).await?; + } + } + + Ok(map_xor_name) } - async fn fetch_chunk(&self, addr: XorName) -> Result { - self.client().fetch_chunk(addr).await + pub(crate) async fn pay( + &mut self, + content_addrs: impl Iterator, + wallet: &Wallet, + ) -> Result<(HashMap, Vec), PayError> { + let cost_map = self.get_store_quotes(content_addrs).await?; + let (quote_payments, skipped_chunks) = extract_quote_payments(&cost_map); + + // TODO: the error might contain some succeeded quote payments as well. These should be returned on err, so that they can be skipped when retrying. + // TODO: retry when it fails? + // Execute chunk payments + let payments = wallet + .pay_for_quotes(quote_payments) + .await + .map_err(|err| PayError::from(err.0))?; + + let proofs = construct_proofs(&cost_map, &payments); + + tracing::trace!( + "Chunk payments of {} chunks completed. {} chunks were free / already paid for", + proofs.len(), + skipped_chunks.len() + ); + + Ok((proofs, skipped_chunks)) } - async fn fetch_from_data_map(&self, data_map: &DataMap) -> Result { - self.client().fetch_from_data_map(data_map).await + async fn get_store_quotes( + &mut self, + content_addrs: impl Iterator, + ) -> Result, PayError> { + let futures: Vec<_> = content_addrs + .into_iter() + .map(|content_addr| fetch_store_quote_with_retries(&self.network, content_addr)) + .collect(); + + let quotes = futures::future::try_join_all(futures).await?; + + Ok(quotes.into_iter().collect::>()) } - async fn fetch_from_data_map_chunk(&self, data_map_bytes: &Bytes) -> Result { - self.client() - .fetch_from_data_map_chunk(data_map_bytes) - .await + /// Directly writes Chunks to the network in the form of immutable self encrypted chunks. + async fn upload_chunk( + &self, + chunk: Chunk, + proof_of_payment: ProofOfPayment, + ) -> Result<(), PutError> { + self.store_chunk(chunk, proof_of_payment).await?; + Ok(()) + } + + /// Actually store a chunk to a peer. + async fn store_chunk(&self, chunk: Chunk, payment: ProofOfPayment) -> Result<(), PutError> { + let storing_node = payment.to_peer_id_payee().expect("Missing node Peer ID"); + + tracing::debug!("Storing chunk: {chunk:?} to {:?}", storing_node); + + let key = chunk.network_address().to_record_key(); + + let record_kind = RecordKind::ChunkWithPayment; + let record = Record { + key: key.clone(), + value: try_serialize_record(&(payment, chunk.clone()), record_kind) + .map_err(|_| PutError::Serialization)? + .to_vec(), + publisher: None, + expires: None, + }; + + let put_cfg = PutRecordCfg { + put_quorum: Quorum::One, + retry_strategy: None, + use_put_record_to: Some(vec![storing_node]), + verification: None, + }; + Ok(self.network.put_record(record, &put_cfg).await?) } } + +/// Fetch a store quote for a content address with a retry strategy. +async fn fetch_store_quote_with_retries( + network: &Network, + content_addr: XorName, +) -> Result<(XorName, PayeeQuote), PayError> { + let mut retries = 0; + + loop { + match fetch_store_quote(network, content_addr).await { + Ok(quote) => { + break Ok((content_addr, quote)); + } + Err(err) if retries < 2 => { + retries += 1; + tracing::error!("Error while fetching store quote: {err:?}, retry #{retries}"); + } + Err(err) => { + tracing::error!( + "Error while fetching store quote: {err:?}, stopping after {retries} retries" + ); + break Err(PayError::CouldNotGetStoreQuote(content_addr)); + } + } + } +} + +/// Fetch a store quote for a content address. +async fn fetch_store_quote( + network: &Network, + content_addr: XorName, +) -> Result { + network + .get_store_costs_from_network( + NetworkAddress::from_chunk_address(ChunkAddress::new(content_addr)), + vec![], + ) + .await +} + +/// Form to be executed payments and already executed payments from a cost map. +fn extract_quote_payments( + cost_map: &HashMap, +) -> (Vec, Vec) { + let mut to_be_paid = vec![]; + let mut already_paid = vec![]; + + for (chunk_address, quote) in cost_map.iter() { + if quote.2.cost.is_zero() { + already_paid.push(*chunk_address); + } else { + to_be_paid.push(( + quote.2.hash(), + quote.2.rewards_address, + quote.2.cost.as_atto(), + )); + } + } + + (to_be_paid, already_paid) +} + +/// Construct payment proofs from cost map and payments map. +fn construct_proofs( + cost_map: &HashMap, + payments: &BTreeMap, +) -> HashMap { + cost_map + .iter() + .filter_map(|(xor_name, (_, _, quote))| { + payments.get("e.hash()).map(|tx_hash| { + ( + *xor_name, + ProofOfPayment { + quote: quote.clone(), + tx_hash: *tx_hash, + }, + ) + }) + }) + .collect() +} diff --git a/autonomi/src/client/files.rs b/autonomi/src/client/files.rs index 7c01776e47..41cf3a328a 100644 --- a/autonomi/src/client/files.rs +++ b/autonomi/src/client/files.rs @@ -53,14 +53,53 @@ impl Client { let data = self.get(file.data_map).await?; Ok(data) } -} -pub trait Files: ClientWrapper { - async fn fetch_root(&mut self, address: XorName) -> Result { - self.client_mut().fetch_root(address).await - } + /// Upload a directory to the network. The directory is recursively walked. + #[cfg(feature = "fs")] + pub async fn upload_from_dir( + &mut self, + path: PathBuf, + wallet: &Wallet, + ) -> Result<(Root, XorName), UploadError> { + let mut map = HashMap::new(); + + for entry in WalkDir::new(path) { + let entry = entry?; + + if !entry.file_type().is_file() { + continue; + } + + let path = entry.path().to_path_buf(); + tracing::info!("Uploading file: {path:?}"); + let file = upload_from_file(self, path.clone(), wallet).await?; + + map.insert(path, file); + } - async fn fetch_file(&mut self, file: &FilePointer) -> Result { - self.client_mut().fetch_file(file).await + let root = Root { map }; + let root_serialized = rmp_serde::to_vec(&root).expect("TODO"); + + let xor_name = self.put(Bytes::from(root_serialized), wallet).await?; + + Ok((root, xor_name)) } } + +async fn upload_from_file( + client: &mut Client, + path: PathBuf, + wallet: &Wallet, +) -> Result { + let data = tokio::fs::read(path).await?; + let data = Bytes::from(data); + + let addr = client.put(data, wallet).await?; + + // TODO: Set created_at and modified_at + Ok(FilePointer { + data_map: addr, + created_at: 0, + modified_at: 0, + }) +} diff --git a/autonomi/src/client/mod.rs b/autonomi/src/client/mod.rs index 292bf53275..2900ae12b7 100644 --- a/autonomi/src/client/mod.rs +++ b/autonomi/src/client/mod.rs @@ -169,25 +169,3 @@ async fn handle_event_receiver( // TODO: Handle closing of network events sender } - -pub trait ClientWrapper { - fn from_client(client: Client) -> Self; - - fn client(&self) -> &Client; - - fn client_mut(&mut self) -> &mut Client; - - fn into_client(self) -> Client; - - fn network(&self) -> &Network { - &self.client().network - } - - async fn connect(peers: &[Multiaddr]) -> Result - where - Self: Sized, - { - let client = Client::connect(peers).await?; - Ok(Self::from_client(client)) - } -} diff --git a/autonomi/src/client/registers.rs b/autonomi/src/client/registers.rs index fb87071a42..47d4dbdf74 100644 --- a/autonomi/src/client/registers.rs +++ b/autonomi/src/client/registers.rs @@ -143,21 +143,132 @@ impl Client { Ok(()) } -} -pub trait Registers: ClientWrapper { - async fn fetch_register(&self, address: RegisterAddress) -> Result { - self.client().fetch_register(address).await + /// Creates a new Register with an initial value and uploads it to the network. + pub async fn create_register( + &mut self, + value: Bytes, + name: XorName, + owner: SecretKey, + wallet: &Wallet, + ) -> Result { + let pk = owner.public_key(); + + // Owner can write to the register. + let permissions = Permissions::new_with([pk]); + let mut register = ClientRegister::new(pk, name, permissions); + let address = NetworkAddress::from_register_address(*register.address()); + + let entries = register + .read() + .into_iter() + .map(|(entry_hash, _value)| entry_hash) + .collect(); + + // TODO: Handle error. + let _ = register.write(value.into(), &entries, &owner); + let reg_xor = register.address().xorname(); + let (payment_proofs, _) = self.pay(std::iter::once(reg_xor), wallet).await?; + // Should always be there, else it would have failed on the payment step. + let proof = payment_proofs.get(®_xor).expect("Missing proof"); + let payee = proof.to_peer_id_payee().expect("Missing payee Peer ID"); + let signed_register = register.clone().into_signed(&owner).expect("TODO"); + + let record = Record { + key: address.to_record_key(), + value: try_serialize_record( + &(proof, &signed_register), + RecordKind::RegisterWithPayment, + ) + .map_err(|_| RegisterError::Serialization)? + .to_vec(), + publisher: None, + expires: None, + }; + + let put_cfg = PutRecordCfg { + put_quorum: Quorum::All, + retry_strategy: None, + use_put_record_to: Some(vec![payee]), + verification: None, + }; + + self.network().put_record(record, &put_cfg).await?; + + Ok(Register { + inner: signed_register, + }) + } + + /// Fetches a Register from the network. + pub async fn fetch_register( + &self, + address: RegisterAddress, + ) -> Result { + let network_address = NetworkAddress::from_register_address(address); + let key = network_address.to_record_key(); + + let get_cfg = GetRecordCfg { + get_quorum: Quorum::One, + retry_strategy: None, + target_record: None, + expected_holders: Default::default(), + is_register: true, + }; + + let record = self + .network() + .get_record_from_network(key, &get_cfg) + .await?; + + let register: SignedRegister = + try_deserialize_record(&record).map_err(|_| RegisterError::Serialization)?; + + Ok(Register { inner: register }) } - async fn update_register( + /// Updates a Register on the network with a new value. This will overwrite existing value(s). + pub async fn update_register( &self, register: Register, new_value: Bytes, owner: SecretKey, ) -> Result<(), RegisterError> { - self.client() - .update_register(register, new_value, owner) - .await + // Fetch the current register + let mut signed_register = register.inner; + let mut register = signed_register.clone().register().expect("TODO"); + + // Get all current branches + let children: BTreeSet = register.read().into_iter().map(|(e, _)| e).collect(); + + // Write the new value to all branches + let (_, op) = register + .write(new_value.to_vec(), &children, &owner) + .expect("TODO"); + + // Apply the operation to the register + signed_register.add_op(op.clone()).expect("TODO"); + + // Prepare the record for network storage + let record = Record { + key: NetworkAddress::from_register_address(*register.address()).to_record_key(), + value: try_serialize_record(&signed_register, RecordKind::Register) + .map_err(|_| RegisterError::Serialization)? + .to_vec(), + publisher: None, + expires: None, + }; + + let put_cfg = PutRecordCfg { + put_quorum: Quorum::All, + retry_strategy: None, + use_put_record_to: None, + verification: None, + }; + + // Store the updated register on the network + self.network().put_record(record, &put_cfg).await?; + + Ok(()) } } diff --git a/autonomi/src/client/vault.rs b/autonomi/src/client/vault.rs index 7cc1d080bb..779aba4b59 100644 --- a/autonomi/src/client/vault.rs +++ b/autonomi/src/client/vault.rs @@ -77,22 +77,96 @@ impl Client { Ok(pad) } -} -pub trait Vault: ClientWrapper { - fn with_vault_entropy(self, bytes: Bytes) -> Result - where - Self: Sized, - { - let client = self.into_client().with_vault_entropy(bytes)?; - Ok(Self::from_client(client)) - } + /// Put data into the client's VaultPacket + /// + /// Returns Ok(None) early if no vault packet is defined. + /// + /// Pays for a new VaultPacket if none yet created for the client. Returns the current version + /// of the data on success. + pub async fn write_bytes_to_vault_if_defined( + &mut self, + data: Bytes, + wallet: &mut Wallet, + ) -> Result, PutError> { + // Exit early if no vault packet defined + let Some(client_sk) = self.client().vault_secret_key.as_ref() else { + return Ok(None); + }; - async fn fetch_and_decrypt_vault(&self) -> Result, VaultError> { - self.client().fetch_and_decrypt_vault().await - } + let client_pk = client_sk.public_key(); - async fn get_vault_from_network(&self) -> Result { - self.client().get_vault_from_network().await + let pad_res = self.get_vault_from_network().await; + let mut is_new = true; + + let mut scratch = if let Ok(existing_data) = pad_res { + tracing::info!("Scratchpad already exists, returning existing data"); + + info!( + "scratch already exists, is version {:?}", + existing_data.count() + ); + + is_new = false; + existing_data + } else { + tracing::trace!("new scratchpad creation"); + Scratchpad::new(client_pk) + }; + + let next_count = scratch.update_and_sign(data, client_sk); + let scratch_address = scratch.network_address(); + let scratch_key = scratch_address.to_record_key(); + + let record = if is_new { + self.pay( + [&scratch_address].iter().filter_map(|f| f.as_xorname()), + wallet, + ) + .await?; + + let scratch_xor = scratch_address.as_xorname().ok_or(PutError::VaultXorName)?; + let (payment_proofs, _) = self.pay(std::iter::once(scratch_xor), wallet).await?; + // Should always be there, else it would have failed on the payment step. + let proof = payment_proofs.get(&scratch_xor).expect("Missing proof"); + + Record { + key: scratch_key, + value: try_serialize_record(&(proof, scratch), RecordKind::ScratchpadWithPayment) + .map_err(|_| PutError::Serialization)? + .to_vec(), + publisher: None, + expires: None, + } + } else { + Record { + key: scratch_key, + value: try_serialize_record(&scratch, RecordKind::Scratchpad) + .map_err(|_| PutError::Serialization)? + .to_vec(), + publisher: None, + expires: None, + } + }; + + let put_cfg = PutRecordCfg { + put_quorum: Quorum::Majority, + retry_strategy: Some(RetryStrategy::Balanced), + use_put_record_to: None, + verification: Some(( + VerificationKind::Network, + GetRecordCfg { + get_quorum: Quorum::Majority, + retry_strategy: None, + target_record: None, + expected_holders: HashSet::new(), + is_register: false, + }, + )), + }; + + self.network().put_record(record, &put_cfg).await?; + + Ok(Some(next_count)) } } diff --git a/autonomi/src/evm/client/data.rs b/autonomi/src/evm/client/data.rs index 9061b6fb19..1f724fe127 100644 --- a/autonomi/src/evm/client/data.rs +++ b/autonomi/src/evm/client/data.rs @@ -1,9 +1,10 @@ use crate::client::data::{Data, PayError, PutError}; -use crate::client::ClientWrapper; -use crate::evm::client::EvmClient; +use crate::evm::client::Client; use crate::self_encryption::encrypt; use bytes::Bytes; use evmlib::common::{QuoteHash, QuotePayment, TxHash}; + +#[cfg(feature = "evm-payments")] use evmlib::wallet::Wallet; use libp2p::futures; use libp2p::kad::{Quorum, Record}; @@ -17,9 +18,7 @@ use sn_protocol::{ use std::collections::{BTreeMap, HashMap}; use xor_name::XorName; -impl Data for EvmClient {} - -impl EvmClient { +impl Client { /// Upload a piece of data to the network. This data will be self-encrypted, /// and the data map XOR address will be returned. pub async fn put(&mut self, data: Bytes, wallet: &Wallet) -> Result { @@ -57,6 +56,7 @@ impl EvmClient { pub(crate) async fn pay( &mut self, content_addrs: impl Iterator, + [cfg::feature = "vault"] wallet: &Wallet, ) -> Result<(HashMap, Vec), PayError> { let cost_map = self.get_store_quotes(content_addrs).await?; diff --git a/autonomi/src/evm/client/files.rs b/autonomi/src/evm/client/files.rs deleted file mode 100644 index be8e02e1cb..0000000000 --- a/autonomi/src/evm/client/files.rs +++ /dev/null @@ -1,60 +0,0 @@ -use crate::client::files::{FilePointer, Files, Root, UploadError}; -use crate::evm::client::EvmClient; -use bytes::{BufMut, Bytes}; -use evmlib::wallet::Wallet; -use std::{collections::HashMap, path::PathBuf}; -use walkdir::WalkDir; -use xor_name::XorName; - -impl Files for EvmClient {} - -impl EvmClient { - /// Upload a directory to the network. The directory is recursively walked. - #[cfg(feature = "fs")] - pub async fn upload_from_dir( - &mut self, - path: PathBuf, - wallet: &Wallet, - ) -> Result<(Root, XorName), UploadError> { - let mut map = HashMap::new(); - - for entry in WalkDir::new(path) { - let entry = entry?; - - if !entry.file_type().is_file() { - continue; - } - - let path = entry.path().to_path_buf(); - tracing::info!("Uploading file: {path:?}"); - let file = upload_from_file(self, path.clone(), wallet).await?; - - map.insert(path, file); - } - - let root = Root { map }; - let root_serialized = rmp_serde::to_vec(&root).expect("TODO"); - - let xor_name = self.put(Bytes::from(root_serialized), wallet).await?; - - Ok((root, xor_name)) - } -} - -async fn upload_from_file( - client: &mut EvmClient, - path: PathBuf, - wallet: &Wallet, -) -> Result { - let data = tokio::fs::read(path).await?; - let data = Bytes::from(data); - - let addr = client.put(data, wallet).await?; - - // TODO: Set created_at and modified_at - Ok(FilePointer { - data_map: addr, - created_at: 0, - modified_at: 0, - }) -} diff --git a/autonomi/src/evm/client/mod.rs b/autonomi/src/evm/client/mod.rs deleted file mode 100644 index 855dc30256..0000000000 --- a/autonomi/src/evm/client/mod.rs +++ /dev/null @@ -1,33 +0,0 @@ -use crate::client::{Client, ClientWrapper}; - -#[cfg(feature = "data")] -pub mod data; -#[cfg(feature = "files")] -pub mod files; -#[cfg(feature = "registers")] -pub mod registers; -#[cfg(feature = "vault")] -pub mod vault; - -#[derive(Clone)] -pub struct EvmClient { - client: Client, -} - -impl ClientWrapper for EvmClient { - fn from_client(client: Client) -> Self { - EvmClient { client } - } - - fn client(&self) -> &Client { - &self.client - } - - fn client_mut(&mut self) -> &mut Client { - &mut self.client - } - - fn into_client(self) -> Client { - self.client - } -} diff --git a/autonomi/src/evm/client/registers.rs b/autonomi/src/evm/client/registers.rs deleted file mode 100644 index 60687c478b..0000000000 --- a/autonomi/src/evm/client/registers.rs +++ /dev/null @@ -1,153 +0,0 @@ -use std::collections::BTreeSet; - -use crate::client::registers::{Register, RegisterError, Registers}; -use crate::client::ClientWrapper; -use crate::evm::client::EvmClient; -use bls::SecretKey; -use bytes::Bytes; -use evmlib::wallet::Wallet; -use libp2p::kad::{Quorum, Record}; -use sn_networking::GetRecordCfg; -use sn_networking::PutRecordCfg; -use sn_protocol::storage::try_deserialize_record; -use sn_protocol::storage::try_serialize_record; -use sn_protocol::storage::RecordKind; -use sn_protocol::storage::RegisterAddress; -use sn_protocol::NetworkAddress; -use sn_registers::EntryHash; -use sn_registers::Permissions; -use sn_registers::Register as ClientRegister; -use sn_registers::SignedRegister; -use xor_name::XorName; - -impl Registers for EvmClient {} - -impl EvmClient { - /// Creates a new Register with an initial value and uploads it to the network. - pub async fn create_register( - &mut self, - value: Bytes, - name: XorName, - owner: SecretKey, - wallet: &Wallet, - ) -> Result { - let pk = owner.public_key(); - - // Owner can write to the register. - let permissions = Permissions::new_with([pk]); - let mut register = ClientRegister::new(pk, name, permissions); - let address = NetworkAddress::from_register_address(*register.address()); - - let entries = register - .read() - .into_iter() - .map(|(entry_hash, _value)| entry_hash) - .collect(); - - // TODO: Handle error. - let _ = register.write(value.into(), &entries, &owner); - let reg_xor = register.address().xorname(); - let (payment_proofs, _) = self.pay(std::iter::once(reg_xor), wallet).await?; - // Should always be there, else it would have failed on the payment step. - let proof = payment_proofs.get(®_xor).expect("Missing proof"); - let payee = proof.to_peer_id_payee().expect("Missing payee Peer ID"); - let signed_register = register.clone().into_signed(&owner).expect("TODO"); - - let record = Record { - key: address.to_record_key(), - value: try_serialize_record( - &(proof, &signed_register), - RecordKind::RegisterWithPayment, - ) - .map_err(|_| RegisterError::Serialization)? - .to_vec(), - publisher: None, - expires: None, - }; - - let put_cfg = PutRecordCfg { - put_quorum: Quorum::All, - retry_strategy: None, - use_put_record_to: Some(vec![payee]), - verification: None, - }; - - self.network().put_record(record, &put_cfg).await?; - - Ok(Register { - inner: signed_register, - }) - } - - /// Fetches a Register from the network. - pub async fn fetch_register( - &self, - address: RegisterAddress, - ) -> Result { - let network_address = NetworkAddress::from_register_address(address); - let key = network_address.to_record_key(); - - let get_cfg = GetRecordCfg { - get_quorum: Quorum::One, - retry_strategy: None, - target_record: None, - expected_holders: Default::default(), - is_register: true, - }; - - let record = self - .network() - .get_record_from_network(key, &get_cfg) - .await?; - - let register: SignedRegister = - try_deserialize_record(&record).map_err(|_| RegisterError::Serialization)?; - - Ok(Register { inner: register }) - } - - /// Updates a Register on the network with a new value. This will overwrite existing value(s). - pub async fn update_register( - &self, - register: Register, - new_value: Bytes, - owner: SecretKey, - ) -> Result<(), RegisterError> { - // Fetch the current register - let mut signed_register = register.inner; - let mut register = signed_register.clone().register().expect("TODO"); - - // Get all current branches - let children: BTreeSet = register.read().into_iter().map(|(e, _)| e).collect(); - - // Write the new value to all branches - let (_, op) = register - .write(new_value.to_vec(), &children, &owner) - .expect("TODO"); - - // Apply the operation to the register - signed_register.add_op(op.clone()).expect("TODO"); - - // Prepare the record for network storage - let record = Record { - key: NetworkAddress::from_register_address(*register.address()).to_record_key(), - value: try_serialize_record(&signed_register, RecordKind::Register) - .map_err(|_| RegisterError::Serialization)? - .to_vec(), - publisher: None, - expires: None, - }; - - let put_cfg = PutRecordCfg { - put_quorum: Quorum::All, - retry_strategy: None, - use_put_record_to: None, - verification: None, - }; - - // Store the updated register on the network - self.network().put_record(record, &put_cfg).await?; - - Ok(()) - } -} diff --git a/autonomi/src/evm/client/vault.rs b/autonomi/src/evm/client/vault.rs deleted file mode 100644 index c71b9803b7..0000000000 --- a/autonomi/src/evm/client/vault.rs +++ /dev/null @@ -1,107 +0,0 @@ -use crate::client::data::PutError; -use crate::client::vault::Vault; -use crate::client::ClientWrapper; -use crate::evm::client::EvmClient; -use bytes::Bytes; -use evmlib::wallet::Wallet; -use libp2p::kad::{Quorum, Record}; -use sn_networking::{GetRecordCfg, PutRecordCfg, VerificationKind}; -use sn_protocol::storage::{try_serialize_record, RecordKind, RetryStrategy, Scratchpad}; -use std::collections::HashSet; -use tracing::info; - -impl Vault for EvmClient {} - -impl EvmClient { - /// Put data into the client's VaultPacket - /// - /// Returns Ok(None) early if no vault packet is defined. - /// - /// Pays for a new VaultPacket if none yet created for the client. Returns the current version - /// of the data on success. - pub async fn write_bytes_to_vault_if_defined( - &mut self, - data: Bytes, - wallet: &mut Wallet, - ) -> Result, PutError> { - // Exit early if no vault packet defined - let Some(client_sk) = self.client().vault_secret_key.as_ref() else { - return Ok(None); - }; - - let client_pk = client_sk.public_key(); - - let pad_res = self.get_vault_from_network().await; - let mut is_new = true; - - let mut scratch = if let Ok(existing_data) = pad_res { - tracing::info!("Scratchpad already exists, returning existing data"); - - info!( - "scratch already exists, is version {:?}", - existing_data.count() - ); - - is_new = false; - existing_data - } else { - tracing::trace!("new scratchpad creation"); - Scratchpad::new(client_pk) - }; - - let next_count = scratch.update_and_sign(data, client_sk); - let scratch_address = scratch.network_address(); - let scratch_key = scratch_address.to_record_key(); - - let record = if is_new { - self.pay( - [&scratch_address].iter().filter_map(|f| f.as_xorname()), - wallet, - ) - .await?; - - let scratch_xor = scratch_address.as_xorname().ok_or(PutError::VaultXorName)?; - let (payment_proofs, _) = self.pay(std::iter::once(scratch_xor), wallet).await?; - // Should always be there, else it would have failed on the payment step. - let proof = payment_proofs.get(&scratch_xor).expect("Missing proof"); - - Record { - key: scratch_key, - value: try_serialize_record(&(proof, scratch), RecordKind::ScratchpadWithPayment) - .map_err(|_| PutError::Serialization)? - .to_vec(), - publisher: None, - expires: None, - } - } else { - Record { - key: scratch_key, - value: try_serialize_record(&scratch, RecordKind::Scratchpad) - .map_err(|_| PutError::Serialization)? - .to_vec(), - publisher: None, - expires: None, - } - }; - - let put_cfg = PutRecordCfg { - put_quorum: Quorum::Majority, - retry_strategy: Some(RetryStrategy::Balanced), - use_put_record_to: None, - verification: Some(( - VerificationKind::Network, - GetRecordCfg { - get_quorum: Quorum::Majority, - retry_strategy: None, - target_record: None, - expected_holders: HashSet::new(), - is_register: false, - }, - )), - }; - - self.network().put_record(record, &put_cfg).await?; - - Ok(Some(next_count)) - } -} diff --git a/autonomi/src/evm/mod.rs b/autonomi/src/evm/mod.rs deleted file mode 100644 index a7c160ea53..0000000000 --- a/autonomi/src/evm/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub use crate::client::Client; - -pub mod client; - -pub type EvmWallet = evmlib::wallet::Wallet; diff --git a/autonomi/src/lib.rs b/autonomi/src/lib.rs index 295c3ca576..f156d7aadc 100644 --- a/autonomi/src/lib.rs +++ b/autonomi/src/lib.rs @@ -21,23 +21,19 @@ // docs.rs generation will enable unstable `doc_cfg` feature #![cfg_attr(docsrs, feature(doc_cfg))] +pub mod client; #[doc(no_inline)] // Place this under 'Re-exports' in the docs. pub use bytes::Bytes; #[doc(no_inline)] // Place this under 'Re-exports' in the docs. pub use libp2p::Multiaddr; -pub mod client; -#[cfg(feature = "evm-payments")] -pub mod evm; +pub use client::Client; + #[cfg(feature = "native-payments")] pub mod native; + +#[cfg(feature = "data")] mod self_encryption; #[cfg(feature = "transfers")] const VERIFY_STORE: bool = true; - -#[cfg(all(feature = "native-payments", not(feature = "evm-payments")))] -pub type Client = native::Client; - -#[cfg(all(feature = "evm-payments", not(feature = "native-payments")))] -pub type Client = evm::Client; diff --git a/autonomi/tests/evm/file.rs b/autonomi/tests/evm/file.rs index 4b4abfee35..6a85ff3f07 100644 --- a/autonomi/tests/evm/file.rs +++ b/autonomi/tests/evm/file.rs @@ -1,78 +1,82 @@ -use crate::common; -use crate::common::{evm_network_from_env, evm_wallet_from_env_or_default}; -use crate::evm::Client; -use bytes::Bytes; -use eyre::bail; -use std::time::Duration; -use tokio::time::sleep; +#[cfg(feature = "evm-payments")] +mod test { -#[tokio::test] -async fn file() -> Result<(), Box> { - common::enable_logging(); + use crate::common; + use crate::common::{evm_network_from_env, evm_wallet_from_env_or_default}; + use crate::evm::Client; + use bytes::Bytes; + use eyre::bail; + use std::time::Duration; + use tokio::time::sleep; - let network = evm_network_from_env(); - let mut client = Client::connect(&[]).await.unwrap(); - let mut wallet = evm_wallet_from_env_or_default(network); + #[tokio::test] + async fn file() -> Result<(), Box> { + common::enable_logging(); - // let data = common::gen_random_data(1024 * 1024 * 1000); - // let user_key = common::gen_random_data(32); + let network = evm_network_from_env(); + let mut client = Client::connect(&[]).await.unwrap(); + let mut wallet = evm_wallet_from_env_or_default(network); - let (root, addr) = client - .upload_from_dir("tests/file/test_dir".into(), &mut wallet) - .await?; + // let data = common::gen_random_data(1024 * 1024 * 1000); + // let user_key = common::gen_random_data(32); - sleep(Duration::from_secs(10)).await; + let (root, addr) = client + .upload_from_dir("tests/file/test_dir".into(), &mut wallet) + .await?; - let root_fetched = client.fetch_root(addr).await?; + sleep(Duration::from_secs(10)).await; - assert_eq!( - root.map, root_fetched.map, - "root fetched should match root put" - ); + let root_fetched = client.fetch_root(addr).await?; - Ok(()) -} - -#[cfg(feature = "vault")] -#[tokio::test] -async fn file_into_vault() -> eyre::Result<()> { - common::enable_logging(); - - let network = evm_network_from_env(); + assert_eq!( + root.map, root_fetched.map, + "root fetched should match root put" + ); - let mut client = Client::connect(&[]) - .await? - .with_vault_entropy(Bytes::from("at least 32 bytes of entropy here"))?; + Ok(()) + } - let mut wallet = evm_wallet_from_env_or_default(network); + #[cfg(feature = "vault")] + #[tokio::test] + async fn file_into_vault() -> eyre::Result<()> { + common::enable_logging(); - let (root, addr) = client - .upload_from_dir("tests/file/test_dir".into(), &mut wallet) - .await?; - sleep(Duration::from_secs(2)).await; + let network = evm_network_from_env(); - let root_fetched = client.fetch_root(addr).await?; + let mut client = Client::connect(&[]) + .await? + .with_vault_entropy(Bytes::from("at least 32 bytes of entropy here"))?; - assert_eq!( - root.map, root_fetched.map, - "root fetched should match root put" - ); + let mut wallet = evm_wallet_from_env_or_default(network); - // now assert over the stored account packet - let new_client = Client::connect(&[]) - .await? - .with_vault_entropy(Bytes::from("at least 32 bytes of entropy here"))?; + let (root, addr) = client + .upload_from_dir("tests/file/test_dir".into(), &mut wallet) + .await?; + sleep(Duration::from_secs(2)).await; - if let Some(ap) = new_client.fetch_and_decrypt_vault().await? { - let ap_root_fetched = Client::deserialise_root(ap)?; + let root_fetched = client.fetch_root(addr).await?; assert_eq!( - root.map, ap_root_fetched.map, + root.map, root_fetched.map, "root fetched should match root put" ); - } else { - bail!("No account packet found"); - } - Ok(()) + // now assert over the stored account packet + let new_client = Client::connect(&[]) + .await? + .with_vault_entropy(Bytes::from("at least 32 bytes of entropy here"))?; + + if let Some(ap) = new_client.fetch_and_decrypt_vault().await? { + let ap_root_fetched = Client::deserialise_root(ap)?; + + assert_eq!( + root.map, ap_root_fetched.map, + "root fetched should match root put" + ); + } else { + bail!("No account packet found"); + } + + Ok(()) + } } diff --git a/autonomi/tests/evm/mod.rs b/autonomi/tests/evm/mod.rs index fa74db16b4..cdddaa504a 100644 --- a/autonomi/tests/evm/mod.rs +++ b/autonomi/tests/evm/mod.rs @@ -1,5 +1,3 @@ -use autonomi; - #[cfg(feature = "files")] mod file; #[cfg(feature = "data")] @@ -7,5 +5,3 @@ mod put; #[cfg(feature = "registers")] mod register; mod wallet; - -pub type Client = autonomi::evm::client::EvmClient; diff --git a/autonomi/tests/evm/put.rs b/autonomi/tests/evm/put.rs index 7502ceef4d..9d6a236f85 100644 --- a/autonomi/tests/evm/put.rs +++ b/autonomi/tests/evm/put.rs @@ -2,9 +2,7 @@ use std::time::Duration; use crate::common; use crate::common::{evm_network_from_env, evm_wallet_from_env_or_default}; -use crate::evm::Client; -use autonomi::client::data::Data; -use autonomi::client::ClientWrapper; +use autonomi::Client; use tokio::time::sleep; #[tokio::test] diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index 8369665c12..7ddad3cdce 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -379,7 +379,9 @@ impl Network { }) => { // Check the quote itself is valid. if quote.cost - != AttoTokens::from_u64(calculate_cost_for_records(quote.quoting_metrics.close_records_stored)) + != AttoTokens::from_u64(calculate_cost_for_records( + quote.quoting_metrics.close_records_stored, + )) { warn!("Received invalid quote from {peer_address:?}, {quote:?}"); continue; diff --git a/sn_networking/src/record_store.rs b/sn_networking/src/record_store.rs index 7ce96c2e41..fb58db61e8 100644 --- a/sn_networking/src/record_store.rs +++ b/sn_networking/src/record_store.rs @@ -964,7 +964,9 @@ mod tests { use quickcheck::*; use sn_evm::utils::dummy_address; use sn_evm::{PaymentQuote, RewardsAddress}; - use sn_protocol::storage::{try_deserialize_record, try_serialize_record, Chunk, ChunkAddress, Scratchpad}; + use sn_protocol::storage::{ + try_deserialize_record, try_serialize_record, Chunk, ChunkAddress, Scratchpad, + }; use std::collections::BTreeMap; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use tokio::runtime::Runtime; @@ -1656,8 +1658,10 @@ mod tests { peer.records_stored.fetch_add(1, Ordering::Relaxed); if peer_index == payee_index { - peer.nanos_earned - .fetch_add(cost.as_atto().try_into().unwrap_or(u64::MAX), Ordering::Relaxed); + peer.nanos_earned.fetch_add( + cost.as_atto().try_into().unwrap_or(u64::MAX), + Ordering::Relaxed, + ); peer.payments_received.fetch_add(1, Ordering::Relaxed); } } diff --git a/sn_node/src/put_validation.rs b/sn_node/src/put_validation.rs index f78d0990fa..85a38a6c1c 100644 --- a/sn_node/src/put_validation.rs +++ b/sn_node/src/put_validation.rs @@ -12,7 +12,8 @@ use sn_evm::ProofOfPayment; use sn_networking::{get_raw_signed_spends_from_record, GetRecordError, NetworkError}; use sn_protocol::{ storage::{ - try_deserialize_record, try_serialize_record, Chunk, RecordHeader, RecordKind, RecordType, Scratchpad, SpendAddress + try_deserialize_record, try_serialize_record, Chunk, RecordHeader, RecordKind, RecordType, + Scratchpad, SpendAddress, }, NetworkAddress, PrettyPrintRecordKey, }; diff --git a/sn_node_manager/src/local.rs b/sn_node_manager/src/local.rs index 3f31ac899e..6373ba46d4 100644 --- a/sn_node_manager/src/local.rs +++ b/sn_node_manager/src/local.rs @@ -8,8 +8,7 @@ use crate::add_services::config::PortRange; use crate::helpers::{ - check_port_availability, get_bin_version, get_start_port_if_applicable, - increment_port_option, + check_port_availability, get_bin_version, get_start_port_if_applicable, increment_port_option, }; use color_eyre::eyre::OptionExt; use color_eyre::{eyre::eyre, Result}; @@ -22,7 +21,8 @@ use sn_evm::{EvmNetwork, RewardsAddress}; use sn_logging::LogFormat; use sn_service_management::{ control::ServiceControl, - rpc::{RpcActions, RpcClient}, NodeRegistry, NodeServiceData, ServiceStatus, + rpc::{RpcActions, RpcClient}, + NodeRegistry, NodeServiceData, ServiceStatus, }; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr},