Skip to content

Commit

Permalink
Merge pull request #2219 from RolandSherwin/evm_churn
Browse files Browse the repository at this point in the history
 fix(test): get data with churn test running
  • Loading branch information
RolandSherwin authored Oct 9, 2024
2 parents 8f6244b + 118fca8 commit f960d1b
Show file tree
Hide file tree
Showing 27 changed files with 804 additions and 825 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions autonomi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ data = []
vault = ["data"]
files = ["data"]
fs = ["tokio/fs", "files"]
local-discovery = ["sn_networking/local-discovery"]
local-discovery = ["sn_networking/local-discovery", "test_utils/local-discovery"]
registers = []

[dependencies]
Expand All @@ -34,7 +34,6 @@ rmp-serde = "1.1.1"
self_encryption = "~0.29.0"
serde = { version = "1.0.133", features = ["derive", "rc"] }
sn_networking = { path = "../sn_networking", version = "0.18.3" }
sn_peers_acquisition = { path = "../sn_peers_acquisition", version = "0.5.2" }
sn_protocol = { version = "0.17.10", path = "../sn_protocol" }
sn_registers = { path = "../sn_registers", version = "0.3.20" }
sn_transfers = { path = "../sn_transfers", version = "0.19.2" }
Expand Down
10 changes: 8 additions & 2 deletions autonomi/src/client/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@ pub enum DataError {
}

pub fn str_to_xorname(addr: &str) -> Result<XorName, DataError> {
let bytes = hex::decode(addr).map_err(|_| DataError::InvalidHexString)?;
let xor = XorName(bytes.try_into().map_err(|_| DataError::InvalidXorName)?);
let bytes = hex::decode(addr).map_err(|err| {
error!("Failed to decode hex string: {err:?}");
DataError::InvalidHexString
})?;
let xor = XorName(bytes.try_into().map_err(|err| {
error!("Failed to convert bytes to XorName: {err:?}");
DataError::InvalidXorName
})?);
Ok(xor)
}

Expand Down
73 changes: 54 additions & 19 deletions autonomi/src/client/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl Client {
/// Fetch a piece of self-encrypted data from the network, by its data map
/// XOR address.
pub async fn get(&self, data_map_addr: XorName) -> Result<Bytes, GetError> {
info!("Fetching file from data_map: {data_map_addr:?}");
let data_map_chunk = self.fetch_chunk(data_map_addr).await?;
let data = self
.fetch_from_data_map_chunk(data_map_chunk.value())
Expand All @@ -94,7 +95,7 @@ impl Client {

/// Get a raw chunk from the network.
pub async fn fetch_chunk(&self, addr: XorName) -> Result<Chunk, GetError> {
tracing::info!("Getting chunk: {addr:?}");
info!("Getting chunk: {addr:?}");

let key = NetworkAddress::from_chunk_address(ChunkAddress::new(addr)).to_record_key();

Expand All @@ -106,7 +107,11 @@ impl Client {
is_register: false,
};

let record = self.network.get_record_from_network(key, &get_cfg).await?;
let record = self
.network
.get_record_from_network(key, &get_cfg)
.await
.inspect_err(|err| error!("Error fetching chunk: {err:?}"))?;
let header = RecordHeader::from_record(&record)?;

if let RecordKind::Chunk = header.kind {
Expand All @@ -122,24 +127,30 @@ impl Client {
let mut encrypted_chunks = vec![];

for info in data_map.infos() {
let chunk = self.fetch_chunk(info.dst_hash).await?;
let chunk = self
.fetch_chunk(info.dst_hash)
.await
.inspect_err(|err| error!("Error fetching chunk {:?}: {err:?}", info.dst_hash))?;
let chunk = EncryptedChunk {
index: info.index,
content: chunk.value,
};
encrypted_chunks.push(chunk);
}

let data = decrypt_full_set(data_map, &encrypted_chunks)
.map_err(|e| GetError::Decryption(crate::self_encryption::Error::SelfEncryption(e)))?;
let data = decrypt_full_set(data_map, &encrypted_chunks).map_err(|e| {
error!("Error decrypting encrypted_chunks: {e:?}");
GetError::Decryption(crate::self_encryption::Error::SelfEncryption(e))
})?;

Ok(data)
}

/// Unpack a wrapped data map and fetch all bytes using self-encryption.
async fn fetch_from_data_map_chunk(&self, data_map_bytes: &Bytes) -> Result<Bytes, GetError> {
let mut data_map_level: DataMapLevel =
rmp_serde::from_slice(data_map_bytes).map_err(GetError::InvalidDataMap)?;
let mut data_map_level: DataMapLevel = rmp_serde::from_slice(data_map_bytes)
.map_err(GetError::InvalidDataMap)
.inspect_err(|err| error!("Error deserializing data map: {err:?}"))?;

loop {
let data_map = match &data_map_level {
Expand All @@ -152,8 +163,10 @@ impl Client {
match &data_map_level {
DataMapLevel::First(_) => break Ok(data),
DataMapLevel::Additional(_) => {
data_map_level =
rmp_serde::from_slice(&data).map_err(GetError::InvalidDataMap)?;
data_map_level = rmp_serde::from_slice(&data).map_err(|err| {
error!("Error deserializing data map: {err:?}");
GetError::InvalidDataMap(err)
})?;
continue;
}
};
Expand All @@ -165,8 +178,12 @@ impl Client {
pub async fn put(&self, data: Bytes, wallet: &Wallet) -> Result<XorName, PutError> {
let now = sn_networking::target_arch::Instant::now();
let (data_map_chunk, chunks) = encrypt(data)?;
info!(
"Uploading datamap chunk to the network at: {:?}",
data_map_chunk.address()
);

tracing::debug!("Encryption took: {:.2?}", now.elapsed());
debug!("Encryption took: {:.2?}", now.elapsed());

let map_xor_name = *data_map_chunk.address().xorname();
let mut xor_names = vec![map_xor_name];
Expand All @@ -176,18 +193,28 @@ impl Client {
}

// Pay for all chunks + data map chunk
let (payment_proofs, _free_chunks) = self.pay(xor_names.into_iter(), wallet).await?;
info!("Paying for {} addresses", xor_names.len());
let (payment_proofs, _free_chunks) = self
.pay(xor_names.into_iter(), wallet)
.await
.inspect_err(|err| error!("Error paying for data: {err:?}"))?;

// Upload data map
if let Some(proof) = payment_proofs.get(&map_xor_name) {
debug!("Uploading data map chunk: {map_xor_name:?}");
self.upload_chunk(data_map_chunk.clone(), proof.clone())
.await?;
.await
.inspect_err(|err| error!("Error uploading data map chunk: {err:?}"))?;
}

// Upload the rest of the chunks
debug!("Uploading {} chunks", chunks.len());
for chunk in chunks {
if let Some(proof) = payment_proofs.get(chunk.name()) {
self.upload_chunk(chunk, proof.clone()).await?;
let address = *chunk.address();
self.upload_chunk(chunk, proof.clone())
.await
.inspect_err(|err| error!("Error uploading chunk {address:?} :{err:?}"))?;
}
}

Expand All @@ -200,7 +227,7 @@ impl Client {
let now = std::time::Instant::now();
let (data_map_chunk, chunks) = encrypt(data)?;

tracing::debug!("Encryption took: {:.2?}", now.elapsed());
debug!("Encryption took: {:.2?}", now.elapsed());

let map_xor_name = *data_map_chunk.address().xorname();
let mut content_addrs = vec![map_xor_name];
Expand All @@ -209,7 +236,15 @@ impl Client {
content_addrs.push(*chunk.name());
}

let cost_map = self.get_store_quotes(content_addrs.into_iter()).await?;
info!(
"Calculating cost of storing {} chunks. Data map chunk at: {map_xor_name:?}",
content_addrs.len()
);

let cost_map = self
.get_store_quotes(content_addrs.into_iter())
.await
.inspect_err(|err| error!("Error getting store quotes: {err:?}"))?;
let total_cost = AttoTokens::from_atto(
cost_map
.values()
Expand Down Expand Up @@ -238,7 +273,7 @@ impl Client {

let proofs = construct_proofs(&cost_map, &payments);

tracing::trace!(
trace!(
"Chunk payments of {} chunks completed. {} chunks were free / already paid for",
proofs.len(),
skipped_chunks.len()
Expand Down Expand Up @@ -275,7 +310,7 @@ impl Client {
async fn store_chunk(&self, chunk: Chunk, payment: ProofOfPayment) -> Result<(), PutError> {
let storing_node = payment.to_peer_id_payee().expect("Missing node Peer ID");

tracing::debug!("Storing chunk: {chunk:?} to {:?}", storing_node);
debug!("Storing chunk: {chunk:?} to {:?}", storing_node);

let key = chunk.network_address().to_record_key();

Expand Down Expand Up @@ -337,10 +372,10 @@ async fn fetch_store_quote_with_retries(
}
Err(err) if retries < 2 => {
retries += 1;
tracing::error!("Error while fetching store quote: {err:?}, retry #{retries}");
error!("Error while fetching store quote: {err:?}, retry #{retries}");
}
Err(err) => {
tracing::error!(
error!(
"Error while fetching store quote: {err:?}, stopping after {retries} retries"
);
break Err(PayError::CouldNotGetStoreQuote(content_addr));
Expand Down
1 change: 1 addition & 0 deletions autonomi/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl Client {
let _handle = sn_networking::target_arch::spawn(async move {
for addr in peers {
if let Err(err) = network_clone.dial(addr.clone()).await {
error!("Failed to dial addr={addr} with err: {err:?}");
eprintln!("addr={addr} Failed to dial: {err:?}");
};
}
Expand Down
Loading

0 comments on commit f960d1b

Please sign in to comment.