Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Evm integration step 4 #2149

Merged
merged 7 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion autonomi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,25 @@ repository = "https://github.com/maidsafe/safe_network"

[features]
default = ["data"]
full = ["data", "files", "fs", "registers", "transfers", "vault"]
full = ["data", "files", "fs", "registers", "transfers", "vault", "native-payments"]
data = ["transfers"]
vault = ["data"]
files = ["transfers", "data"]
fs = []
local = ["sn_networking/local-discovery"]
registers = ["transfers"]
transfers = []
native-payments = []
evm-payments = []

[dependencies]
bip39 = "2.0.0"
bls = { package = "blsttc", version = "8.0.1" }
bytes = { version = "1.0.1", features = ["serde"] }
curv = { version = "0.10.1", package = "sn_curv", default-features = false, features = ["num-bigint"] }
eip2333 = { version = "0.2.1", package = "sn_bls_ckd" }
const-hex = "1.12.0"
evmlib = { path = "../evmlib", version = "0.1" }
libp2p = "0.54.1"
rand = "0.8.5"
rmp-serde = "1.1.1"
Expand All @@ -36,6 +40,7 @@ sn_peers_acquisition = { path = "../sn_peers_acquisition", version = "0.5.1" }
sn_protocol = { version = "0.17.9", path = "../sn_protocol" }
sn_registers = { path = "../sn_registers", version = "0.3.19" }
sn_transfers = { path = "../sn_transfers", version = "0.19.1" }
sn_evm = { path = "../sn_evm" }
thiserror = "1.0.23"
tokio = { version = "1.35.0", features = ["sync", "fs"] }
tracing = { version = "~0.1.26" }
Expand Down
234 changes: 41 additions & 193 deletions autonomi/src/client/data.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,16 @@
use std::collections::{BTreeMap, HashSet};

use crate::self_encryption::{encrypt, DataMapLevel};
use crate::Client;
use crate::client::{Client, ClientWrapper};
use crate::self_encryption::DataMapLevel;
use bytes::Bytes;
use libp2p::{
kad::{Quorum, Record},
PeerId,
};
use evmlib::wallet;
use libp2p::kad::Quorum;
use self_encryption::{decrypt_full_set, DataMap, EncryptedChunk};
use sn_networking::{GetRecordCfg, NetworkError, PutRecordCfg};
use sn_protocol::{
storage::{
try_deserialize_record, try_serialize_record, Chunk, ChunkAddress, RecordHeader, RecordKind,
},
NetworkAddress,
};
use sn_transfers::Payment;
use sn_transfers::{HotWallet, MainPubkey, NanoTokens, PaymentQuote};
use tokio::task::{JoinError, JoinSet};
use sn_networking::{GetRecordCfg, NetworkError};
use sn_protocol::storage::{try_deserialize_record, Chunk, ChunkAddress, RecordHeader, RecordKind};
use sn_protocol::NetworkAddress;
use std::collections::HashSet;
use tokio::task::JoinError;
use xor_name::XorName;

use super::transfers::SendSpendsError;

/// Errors that can occur during the put operation.
#[derive(Debug, thiserror::Error)]
pub enum PutError {
Expand All @@ -33,23 +22,34 @@ pub enum PutError {
VaultXorName,
#[error("A network error occurred.")]
Network(#[from] NetworkError),
#[cfg(feature = "native-payments")]
#[error("A wallet error occurred.")]
Wallet(#[from] sn_transfers::WalletError),
#[cfg(feature = "evm-payments")]
#[error("A wallet error occurred.")]
EvmWallet(#[from] sn_evm::EvmError),
#[error("Error occurred during payment.")]
PayError(#[from] PayError),
}

/// Errors that can occur during the pay operation.
#[derive(Debug, thiserror::Error)]
pub enum PayError {
#[error("Could not get store quote for: {0:?} after several retries")]
CouldNotGetStoreQuote(XorName),
#[error("Could not get store costs: {0:?}")]
CouldNotGetStoreCosts(sn_networking::NetworkError),
CouldNotGetStoreCosts(NetworkError),
#[error("Could not simultaneously fetch store costs: {0:?}")]
JoinError(JoinError),
#[cfg(feature = "native-payments")]
#[error("Hot wallet error")]
WalletError(#[from] sn_transfers::WalletError),
#[cfg(feature = "evm-payments")]
#[error("Wallet error: {0:?}")]
EvmWalletError(#[from] wallet::Error),
#[cfg(feature = "native-payments")]
#[error("Failed to send spends")]
SendSpendsError(#[from] SendSpendsError),
SendSpendsError(#[from] crate::native::client::transfers::SendSpendsError),
}

/// Errors that can occur during the get operation.
Expand All @@ -60,7 +60,7 @@ pub enum GetError {
#[error("Failed to decrypt data.")]
Decryption(crate::self_encryption::Error),
#[error("General networking error: {0:?}")]
Network(#[from] sn_networking::NetworkError),
Network(#[from] NetworkError),
#[error("General protocol error: {0:?}")]
Protocol(#[from] sn_protocol::Error),
}
Expand All @@ -80,6 +80,7 @@ impl Client {
/// Get a raw chunk from the network.
pub async fn fetch_chunk(&self, addr: XorName) -> Result<Chunk, GetError> {
tracing::info!("Getting chunk: {addr:?}");

let key = NetworkAddress::from_chunk_address(ChunkAddress::new(addr)).to_record_key();

let get_cfg = GetRecordCfg {
Expand All @@ -89,8 +90,10 @@ impl Client {
expected_holders: HashSet::new(),
is_register: false,
};

let record = self.network.get_record_from_network(key, &get_cfg).await?;
let header = RecordHeader::from_record(&record)?;

if let RecordKind::Chunk = header.kind {
let chunk: Chunk = try_deserialize_record(&record)?;
Ok(chunk)
Expand All @@ -99,40 +102,10 @@ impl Client {
}
}

/// Upload a piece of data to the network. This data will be self-encrypted,
/// and the data map XOR address will be returned.
pub async fn put(&mut self, data: Bytes, wallet: &mut HotWallet) -> Result<XorName, PutError> {
let now = std::time::Instant::now();
let (map, chunks) = encrypt(data)?;
tracing::debug!("Encryption took: {:.2?}", now.elapsed());

let map_xor_name = *map.address().xorname();

let mut xor_names = vec![];
xor_names.push(map_xor_name);
for chunk in &chunks {
xor_names.push(*chunk.name());
}

let (.., skipped_chunks) = self.pay(xor_names.into_iter(), wallet).await?;

// TODO: Upload in parallel
if !skipped_chunks.contains(map.name()) {
self.upload_chunk(map, wallet).await?;
}
for chunk in chunks {
if skipped_chunks.contains(chunk.name()) {
continue;
}
self.upload_chunk(chunk, wallet).await?;
}

Ok(map_xor_name)
}

// Fetch and decrypt all chunks in the data map.
/// Fetch and decrypt all chunks in the data map.
async fn fetch_from_data_map(&self, data_map: &DataMap) -> Result<Bytes, GetError> {
let mut encrypted_chunks = vec![];

for info in data_map.infos() {
let chunk = self.fetch_chunk(info.dst_hash).await?;
let chunk = EncryptedChunk {
Expand All @@ -148,7 +121,7 @@ impl Client {
Ok(data)
}

// Unpack a wrapped data map and fetch all bytes using self-encryption.
/// Unpack a wrapped data map and fetch all bytes using self-encryption.
async fn fetch_from_data_map_chunk(&self, data_map_bytes: &Bytes) -> Result<Bytes, GetError> {
let mut data_map_level: DataMapLevel =
rmp_serde::from_slice(data_map_bytes).map_err(GetError::InvalidDataMap)?;
Expand All @@ -171,149 +144,24 @@ impl Client {
};
}
}
}

/// Returns the storage cost, royalty fees, and skipped chunks. In that order as tuple.
pub(crate) async fn pay(
&mut self,
content_addrs: impl Iterator<Item = XorName>,
wallet: &mut HotWallet,
) -> Result<(NanoTokens, NanoTokens, Vec<XorName>), PayError> {
let mut tasks = JoinSet::new();
for content_addr in content_addrs {
let network = self.network.clone();
tasks.spawn(async move {
// TODO: retry, but where?
let cost = network
.get_store_costs_from_network(
NetworkAddress::from_chunk_address(ChunkAddress::new(content_addr)),
vec![],
)
.await
.map_err(PayError::CouldNotGetStoreCosts);

tracing::debug!("Storecosts retrieved for {content_addr:?} {cost:?}");
(content_addr, cost)
});
}
tracing::debug!("Pending store cost tasks: {:?}", tasks.len());

// collect store costs
let mut cost_map = BTreeMap::default();
let mut skipped_chunks = vec![];
while let Some(res) = tasks.join_next().await {
match res {
Ok((content_addr, Ok(cost))) => {
if cost.2.cost == NanoTokens::zero() {
skipped_chunks.push(content_addr);
tracing::debug!("Skipped existing chunk {content_addr:?}");
} else {
tracing::debug!("Storecost inserted into payment map for {content_addr:?}");
let _ = cost_map.insert(content_addr, (cost.1, cost.2, cost.0.to_bytes()));
}
}
Ok((content_addr, Err(err))) => {
tracing::warn!("Cannot get store cost for {content_addr:?} with error {err:?}");
return Err(err);
}
Err(e) => {
return Err(PayError::JoinError(e));
}
}
}

let (storage_cost, royalty_fees) = if cost_map.is_empty() {
(NanoTokens::zero(), NanoTokens::zero())
} else {
self.pay_for_records(&cost_map, wallet).await?
};
Ok((storage_cost, royalty_fees, skipped_chunks))
pub trait Data: ClientWrapper {
async fn get(&self, data_map_addr: XorName) -> Result<Bytes, GetError> {
self.client().get(data_map_addr).await
}

async fn pay_for_records(
&mut self,
cost_map: &BTreeMap<XorName, (MainPubkey, PaymentQuote, Vec<u8>)>,
wallet: &mut HotWallet,
) -> Result<(NanoTokens, NanoTokens), PayError> {
// Before wallet progress, there shall be no `unconfirmed_spend_requests`
self.resend_pending_transactions(wallet).await;

let total_cost = wallet.local_send_storage_payment(cost_map)?;

// send to network
tracing::trace!("Sending storage payment transfer to the network");
let spend_attempt_result = self
.send_spends(wallet.unconfirmed_spend_requests().iter())
.await;

tracing::trace!("send_spends of {} chunks completed", cost_map.len(),);

// Here is bit risky that for the whole bunch of spends to the chunks' store_costs and royalty_fee
// they will get re-paid again for ALL, if any one of the payment failed to be put.
if let Err(error) = spend_attempt_result {
tracing::warn!("The storage payment transfer was not successfully registered in the network: {error:?}. It will be retried later.");

// if we have a DoubleSpend error, lets remove the CashNote from the wallet
if let SendSpendsError::DoubleSpendAttemptedForCashNotes(spent_cash_notes) = &error {
for cash_note_key in spent_cash_notes {
tracing::warn!(
"Removing double spends CashNote from wallet: {cash_note_key:?}"
);
wallet.mark_notes_as_spent([cash_note_key]);
wallet.clear_specific_spend_request(*cash_note_key);
}
}

wallet.store_unconfirmed_spend_requests()?;

return Err(PayError::SendSpendsError(error));
} else {
tracing::info!("Spend has completed: {:?}", spend_attempt_result);
wallet.clear_confirmed_spend_requests();
}
tracing::trace!("clear up spends of {} chunks completed", cost_map.len(),);

Ok(total_cost)
async fn fetch_chunk(&self, addr: XorName) -> Result<Chunk, GetError> {
self.client().fetch_chunk(addr).await
}

/// Directly writes Chunks to the network in the form of immutable self encrypted chunks.
async fn upload_chunk(&self, chunk: Chunk, wallet: &mut HotWallet) -> Result<(), PutError> {
let xor_name = *chunk.name();
let (payment, payee) = self.get_recent_payment_for_addr(&xor_name, wallet)?;

self.store_chunk(chunk, payee, payment).await?;

wallet.api().remove_payment_transaction(&xor_name);

Ok(())
async fn fetch_from_data_map(&self, data_map: &DataMap) -> Result<Bytes, GetError> {
self.client().fetch_from_data_map(data_map).await
}

/// Actually store a chunk to a peer.
async fn store_chunk(
&self,
chunk: Chunk,
payee: PeerId,
payment: Payment,
) -> Result<(), PutError> {
tracing::debug!("Storing chunk: {chunk:?} to {payee:?}");

let key = chunk.network_address().to_record_key();

let record_kind = RecordKind::ChunkWithPayment;
let record = Record {
key: key.clone(),
value: try_serialize_record(&(payment, chunk.clone()), record_kind)
.map_err(|_| PutError::Serialization)?
.to_vec(),
publisher: None,
expires: None,
};

let put_cfg = PutRecordCfg {
put_quorum: Quorum::One,
retry_strategy: None,
use_put_record_to: Some(vec![payee]),
verification: None,
};
Ok(self.network.put_record(record, &put_cfg).await?)
async fn fetch_from_data_map_chunk(&self, data_map_bytes: &Bytes) -> Result<Bytes, GetError> {
self.client()
.fetch_from_data_map_chunk(data_map_bytes)
.await
}
}
Loading
Loading