Skip to content

Commit

Permalink
Merge pull request #2161 from joshuef/SimplifyAutClient
Browse files Browse the repository at this point in the history
autonomi client simplification
  • Loading branch information
grumbach authored Oct 1, 2024
2 parents 6804518 + bd589da commit 43ce3d2
Show file tree
Hide file tree
Showing 19 changed files with 550 additions and 515 deletions.
230 changes: 210 additions & 20 deletions autonomi/src/client/data.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
use crate::client::{Client, ClientWrapper};
use crate::self_encryption::DataMapLevel;
use bytes::Bytes;
use evmlib::wallet;
use libp2p::kad::Quorum;
use libp2p::kad::{Quorum, Record};

use self_encryption::{decrypt_full_set, DataMap, EncryptedChunk};
use sn_networking::{GetRecordCfg, NetworkError};
use sn_protocol::storage::{try_deserialize_record, Chunk, ChunkAddress, RecordHeader, RecordKind};
use sn_protocol::NetworkAddress;
use std::collections::HashSet;
use tokio::task::JoinError;
use xor_name::XorName;

use crate::{self_encryption::encrypt, Client};
use evmlib::common::{QuoteHash, QuotePayment, TxHash};
use evmlib::wallet::Wallet;
use libp2p::futures;
use sn_evm::ProofOfPayment;
use sn_networking::PutRecordCfg;
use sn_networking::{GetRecordCfg, Network, NetworkError, PayeeQuote};
use sn_protocol::{
storage::{
try_deserialize_record, try_serialize_record, Chunk, ChunkAddress, RecordHeader, RecordKind,
},
NetworkAddress,
};
use std::collections::{BTreeMap, HashMap};

/// Errors that can occur during the put operation.
#[derive(Debug, thiserror::Error)]
pub enum PutError {
Expand All @@ -22,14 +34,18 @@ pub enum PutError {
VaultXorName,
#[error("A network error occurred.")]
Network(#[from] NetworkError),
#[error("Error occurred during payment.")]
PayError(#[from] PayError),

// native token
#[cfg(feature = "native-payments")]
#[error("A wallet error occurred.")]
Wallet(#[from] sn_transfers::WalletError),

// evm token
#[cfg(feature = "evm-payments")]
#[error("A wallet error occurred.")]
EvmWallet(#[from] sn_evm::EvmError),
#[error("Error occurred during payment.")]
PayError(#[from] PayError),
Wallet(#[from] sn_evm::EvmError),
}

/// Errors that can occur during the pay operation.
Expand Down Expand Up @@ -144,24 +160,198 @@ impl Client {
};
}
}
}

pub trait Data: ClientWrapper {
async fn get(&self, data_map_addr: XorName) -> Result<Bytes, GetError> {
self.client().get(data_map_addr).await
/// Upload a piece of data to the network. This data will be self-encrypted,
/// and the data map XOR address will be returned.
pub async fn put(&mut self, data: Bytes, wallet: &Wallet) -> Result<XorName, PutError> {
let now = std::time::Instant::now();
let (data_map_chunk, chunks) = encrypt(data)?;

tracing::debug!("Encryption took: {:.2?}", now.elapsed());

let map_xor_name = *data_map_chunk.address().xorname();
let mut xor_names = vec![map_xor_name];

for chunk in &chunks {
xor_names.push(*chunk.name());
}

// Pay for all chunks + data map chunk
let (payment_proofs, _free_chunks) = self.pay(xor_names.into_iter(), wallet).await?;

// Upload data map
if let Some(proof) = payment_proofs.get(&map_xor_name) {
self.upload_chunk(data_map_chunk.clone(), proof.clone())
.await?;
}

// Upload the rest of the chunks
for chunk in chunks {
if let Some(proof) = payment_proofs.get(chunk.name()) {
self.upload_chunk(chunk, proof.clone()).await?;
}
}

Ok(map_xor_name)
}

async fn fetch_chunk(&self, addr: XorName) -> Result<Chunk, GetError> {
self.client().fetch_chunk(addr).await
pub(crate) async fn pay(
&mut self,
content_addrs: impl Iterator<Item = XorName>,
wallet: &Wallet,
) -> Result<(HashMap<XorName, ProofOfPayment>, Vec<XorName>), PayError> {
let cost_map = self.get_store_quotes(content_addrs).await?;
let (quote_payments, skipped_chunks) = extract_quote_payments(&cost_map);

// TODO: the error might contain some succeeded quote payments as well. These should be returned on err, so that they can be skipped when retrying.
// TODO: retry when it fails?
// Execute chunk payments
let payments = wallet
.pay_for_quotes(quote_payments)
.await
.map_err(|err| PayError::from(err.0))?;

let proofs = construct_proofs(&cost_map, &payments);

tracing::trace!(
"Chunk payments of {} chunks completed. {} chunks were free / already paid for",
proofs.len(),
skipped_chunks.len()
);

Ok((proofs, skipped_chunks))
}

async fn fetch_from_data_map(&self, data_map: &DataMap) -> Result<Bytes, GetError> {
self.client().fetch_from_data_map(data_map).await
async fn get_store_quotes(
&mut self,
content_addrs: impl Iterator<Item = XorName>,
) -> Result<HashMap<XorName, PayeeQuote>, PayError> {
let futures: Vec<_> = content_addrs
.into_iter()
.map(|content_addr| fetch_store_quote_with_retries(&self.network, content_addr))
.collect();

let quotes = futures::future::try_join_all(futures).await?;

Ok(quotes.into_iter().collect::<HashMap<XorName, PayeeQuote>>())
}

async fn fetch_from_data_map_chunk(&self, data_map_bytes: &Bytes) -> Result<Bytes, GetError> {
self.client()
.fetch_from_data_map_chunk(data_map_bytes)
.await
/// Directly writes Chunks to the network in the form of immutable self encrypted chunks.
async fn upload_chunk(
&self,
chunk: Chunk,
proof_of_payment: ProofOfPayment,
) -> Result<(), PutError> {
self.store_chunk(chunk, proof_of_payment).await?;
Ok(())
}

/// Actually store a chunk to a peer.
async fn store_chunk(&self, chunk: Chunk, payment: ProofOfPayment) -> Result<(), PutError> {
let storing_node = payment.to_peer_id_payee().expect("Missing node Peer ID");

tracing::debug!("Storing chunk: {chunk:?} to {:?}", storing_node);

let key = chunk.network_address().to_record_key();

let record_kind = RecordKind::ChunkWithPayment;
let record = Record {
key: key.clone(),
value: try_serialize_record(&(payment, chunk.clone()), record_kind)
.map_err(|_| PutError::Serialization)?
.to_vec(),
publisher: None,
expires: None,
};

let put_cfg = PutRecordCfg {
put_quorum: Quorum::One,
retry_strategy: None,
use_put_record_to: Some(vec![storing_node]),
verification: None,
};
Ok(self.network.put_record(record, &put_cfg).await?)
}
}

/// Fetch a store quote for a content address with a retry strategy.
async fn fetch_store_quote_with_retries(
network: &Network,
content_addr: XorName,
) -> Result<(XorName, PayeeQuote), PayError> {
let mut retries = 0;

loop {
match fetch_store_quote(network, content_addr).await {
Ok(quote) => {
break Ok((content_addr, quote));
}
Err(err) if retries < 2 => {
retries += 1;
tracing::error!("Error while fetching store quote: {err:?}, retry #{retries}");
}
Err(err) => {
tracing::error!(
"Error while fetching store quote: {err:?}, stopping after {retries} retries"
);
break Err(PayError::CouldNotGetStoreQuote(content_addr));
}
}
}
}

/// Fetch a store quote for a content address.
async fn fetch_store_quote(
network: &Network,
content_addr: XorName,
) -> Result<PayeeQuote, NetworkError> {
network
.get_store_costs_from_network(
NetworkAddress::from_chunk_address(ChunkAddress::new(content_addr)),
vec![],
)
.await
}

/// Form to be executed payments and already executed payments from a cost map.
fn extract_quote_payments(
cost_map: &HashMap<XorName, PayeeQuote>,
) -> (Vec<QuotePayment>, Vec<XorName>) {
let mut to_be_paid = vec![];
let mut already_paid = vec![];

for (chunk_address, quote) in cost_map.iter() {
if quote.2.cost.is_zero() {
already_paid.push(*chunk_address);
} else {
to_be_paid.push((
quote.2.hash(),
quote.2.rewards_address,
quote.2.cost.as_atto(),
));
}
}

(to_be_paid, already_paid)
}

/// Construct payment proofs from cost map and payments map.
fn construct_proofs(
cost_map: &HashMap<XorName, PayeeQuote>,
payments: &BTreeMap<QuoteHash, TxHash>,
) -> HashMap<XorName, ProofOfPayment> {
cost_map
.iter()
.filter_map(|(xor_name, (_, _, quote))| {
payments.get(&quote.hash()).map(|tx_hash| {
(
*xor_name,
ProofOfPayment {
quote: quote.clone(),
tx_hash: *tx_hash,
},
)
})
})
.collect()
}
53 changes: 46 additions & 7 deletions autonomi/src/client/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,53 @@ impl Client {
let data = self.get(file.data_map).await?;
Ok(data)
}
}

pub trait Files: ClientWrapper {
async fn fetch_root(&mut self, address: XorName) -> Result<Root, UploadError> {
self.client_mut().fetch_root(address).await
}
/// Upload a directory to the network. The directory is recursively walked.
#[cfg(feature = "fs")]
pub async fn upload_from_dir(
&mut self,
path: PathBuf,
wallet: &Wallet,
) -> Result<(Root, XorName), UploadError> {
let mut map = HashMap::new();

for entry in WalkDir::new(path) {
let entry = entry?;

if !entry.file_type().is_file() {
continue;
}

let path = entry.path().to_path_buf();
tracing::info!("Uploading file: {path:?}");
let file = upload_from_file(self, path.clone(), wallet).await?;

map.insert(path, file);
}

async fn fetch_file(&mut self, file: &FilePointer) -> Result<Bytes, UploadError> {
self.client_mut().fetch_file(file).await
let root = Root { map };
let root_serialized = rmp_serde::to_vec(&root).expect("TODO");

let xor_name = self.put(Bytes::from(root_serialized), wallet).await?;

Ok((root, xor_name))
}
}

async fn upload_from_file(
client: &mut Client,
path: PathBuf,
wallet: &Wallet,
) -> Result<FilePointer, UploadError> {
let data = tokio::fs::read(path).await?;
let data = Bytes::from(data);

let addr = client.put(data, wallet).await?;

// TODO: Set created_at and modified_at
Ok(FilePointer {
data_map: addr,
created_at: 0,
modified_at: 0,
})
}
22 changes: 0 additions & 22 deletions autonomi/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,25 +169,3 @@ async fn handle_event_receiver(

// TODO: Handle closing of network events sender
}

pub trait ClientWrapper {
fn from_client(client: Client) -> Self;

fn client(&self) -> &Client;

fn client_mut(&mut self) -> &mut Client;

fn into_client(self) -> Client;

fn network(&self) -> &Network {
&self.client().network
}

async fn connect(peers: &[Multiaddr]) -> Result<Self, ConnectError>
where
Self: Sized,
{
let client = Client::connect(peers).await?;
Ok(Self::from_client(client))
}
}
Loading

0 comments on commit 43ce3d2

Please sign in to comment.