Skip to content

Commit

Permalink
chore(autonomi): add logging to emvlib and autonomi
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin committed Oct 9, 2024
1 parent 98d0e15 commit 52ed976
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 57 deletions.
10 changes: 8 additions & 2 deletions autonomi/src/client/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@ pub enum DataError {
}

pub fn str_to_xorname(addr: &str) -> Result<XorName, DataError> {
let bytes = hex::decode(addr).map_err(|_| DataError::InvalidHexString)?;
let xor = XorName(bytes.try_into().map_err(|_| DataError::InvalidXorName)?);
let bytes = hex::decode(addr).map_err(|err| {
error!("Failed to decode hex string: {err:?}");
DataError::InvalidHexString
})?;
let xor = XorName(bytes.try_into().map_err(|err| {
error!("Failed to convert bytes to XorName: {err:?}");
DataError::InvalidXorName
})?);
Ok(xor)
}

Expand Down
71 changes: 52 additions & 19 deletions autonomi/src/client/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl Client {
/// Fetch a piece of self-encrypted data from the network, by its data map
/// XOR address.
pub async fn get(&self, data_map_addr: XorName) -> Result<Bytes, GetError> {
info!("Fetching file from data_map: {data_map_addr:?}");
let data_map_chunk = self.fetch_chunk(data_map_addr).await?;
let data = self
.fetch_from_data_map_chunk(data_map_chunk.value())
Expand All @@ -94,7 +95,7 @@ impl Client {

/// Get a raw chunk from the network.
pub async fn fetch_chunk(&self, addr: XorName) -> Result<Chunk, GetError> {
tracing::info!("Getting chunk: {addr:?}");
info!("Getting chunk: {addr:?}");

let key = NetworkAddress::from_chunk_address(ChunkAddress::new(addr)).to_record_key();

Expand All @@ -106,7 +107,11 @@ impl Client {
is_register: false,
};

let record = self.network.get_record_from_network(key, &get_cfg).await?;
let record = self
.network
.get_record_from_network(key, &get_cfg)
.await
.inspect_err(|err| error!("Error fetching chunk: {err:?}"))?;
let header = RecordHeader::from_record(&record)?;

if let RecordKind::Chunk = header.kind {
Expand All @@ -122,24 +127,30 @@ impl Client {
let mut encrypted_chunks = vec![];

for info in data_map.infos() {
let chunk = self.fetch_chunk(info.dst_hash).await?;
let chunk = self
.fetch_chunk(info.dst_hash)
.await
.inspect_err(|err| error!("Error fetching chunk {:?}: {err:?}", info.dst_hash))?;
let chunk = EncryptedChunk {
index: info.index,
content: chunk.value,
};
encrypted_chunks.push(chunk);
}

let data = decrypt_full_set(data_map, &encrypted_chunks)
.map_err(|e| GetError::Decryption(crate::self_encryption::Error::SelfEncryption(e)))?;
let data = decrypt_full_set(data_map, &encrypted_chunks).map_err(|e| {
error!("Error decrypting encrypted_chunks: {e:?}");
GetError::Decryption(crate::self_encryption::Error::SelfEncryption(e))
})?;

Ok(data)
}

/// Unpack a wrapped data map and fetch all bytes using self-encryption.
async fn fetch_from_data_map_chunk(&self, data_map_bytes: &Bytes) -> Result<Bytes, GetError> {
let mut data_map_level: DataMapLevel =
rmp_serde::from_slice(data_map_bytes).map_err(GetError::InvalidDataMap)?;
let mut data_map_level: DataMapLevel = rmp_serde::from_slice(data_map_bytes)
.map_err(GetError::InvalidDataMap)
.inspect_err(|err| error!("Error deserializing data map: {err:?}"))?;

loop {
let data_map = match &data_map_level {
Expand All @@ -152,8 +163,10 @@ impl Client {
match &data_map_level {
DataMapLevel::First(_) => break Ok(data),
DataMapLevel::Additional(_) => {
data_map_level =
rmp_serde::from_slice(&data).map_err(GetError::InvalidDataMap)?;
data_map_level = rmp_serde::from_slice(&data).map_err(|err| {
error!("Error deserializing data map: {err:?}");
GetError::InvalidDataMap(err)
})?;
continue;
}
};
Expand All @@ -165,8 +178,12 @@ impl Client {
pub async fn put(&self, data: Bytes, wallet: &Wallet) -> Result<XorName, PutError> {
let now = sn_networking::target_arch::Instant::now();
let (data_map_chunk, chunks) = encrypt(data)?;
info!(
"Uploading chunk to the network at: {:?}",
data_map_chunk.address()
);

tracing::debug!("Encryption took: {:.2?}", now.elapsed());
debug!("Encryption took: {:.2?}", now.elapsed());

let map_xor_name = *data_map_chunk.address().xorname();
let mut xor_names = vec![map_xor_name];
Expand All @@ -176,18 +193,27 @@ impl Client {
}

// Pay for all chunks + data map chunk
let (payment_proofs, _free_chunks) = self.pay(xor_names.into_iter(), wallet).await?;
info!("Paying for {} addresses", xor_names.len());
let (payment_proofs, _free_chunks) = self
.pay(xor_names.into_iter(), wallet)
.await
.inspect_err(|err| error!("Error paying for data: {err:?}"))?;

// Upload data map
if let Some(proof) = payment_proofs.get(&map_xor_name) {
debug!("Uploading data map chunk: {map_xor_name:?}");
self.upload_chunk(data_map_chunk.clone(), proof.clone())
.await?;
.await
.inspect_err(|err| error!("Error uploading data map chunk: {err:?}"))?;
}

// Upload the rest of the chunks
debug!("Uploading {} chunks", chunks.len());
for chunk in chunks {
if let Some(proof) = payment_proofs.get(chunk.name()) {
self.upload_chunk(chunk, proof.clone()).await?;
self.upload_chunk(chunk, proof.clone())
.await
.inspect_err(|err| error!("Error uploading chunk: {err:?}"))?;
}
}

Expand All @@ -200,7 +226,11 @@ impl Client {
let now = std::time::Instant::now();
let (data_map_chunk, chunks) = encrypt(data)?;

tracing::debug!("Encryption took: {:.2?}", now.elapsed());
debug!("Encryption took: {:.2?}", now.elapsed());
info!(
"Calculating cost for chunk to the network at: {:?}",
data_map_chunk.address()
);

let map_xor_name = *data_map_chunk.address().xorname();
let mut content_addrs = vec![map_xor_name];
Expand All @@ -209,7 +239,10 @@ impl Client {
content_addrs.push(*chunk.name());
}

let cost_map = self.get_store_quotes(content_addrs.into_iter()).await?;
let cost_map = self
.get_store_quotes(content_addrs.into_iter())
.await
.inspect_err(|err| error!("Error getting store quotes: {err:?}"))?;
let total_cost = AttoTokens::from_atto(
cost_map
.values()
Expand Down Expand Up @@ -238,7 +271,7 @@ impl Client {

let proofs = construct_proofs(&cost_map, &payments);

tracing::trace!(
trace!(
"Chunk payments of {} chunks completed. {} chunks were free / already paid for",
proofs.len(),
skipped_chunks.len()
Expand Down Expand Up @@ -275,7 +308,7 @@ impl Client {
async fn store_chunk(&self, chunk: Chunk, payment: ProofOfPayment) -> Result<(), PutError> {
let storing_node = payment.to_peer_id_payee().expect("Missing node Peer ID");

tracing::debug!("Storing chunk: {chunk:?} to {:?}", storing_node);
debug!("Storing chunk: {chunk:?} to {:?}", storing_node);

let key = chunk.network_address().to_record_key();

Expand Down Expand Up @@ -337,10 +370,10 @@ async fn fetch_store_quote_with_retries(
}
Err(err) if retries < 2 => {
retries += 1;
tracing::error!("Error while fetching store quote: {err:?}, retry #{retries}");
error!("Error while fetching store quote: {err:?}, retry #{retries}");
}
Err(err) => {
tracing::error!(
error!(
"Error while fetching store quote: {err:?}, stopping after {retries} retries"
);
break Err(PayError::CouldNotGetStoreQuote(content_addr));
Expand Down
1 change: 1 addition & 0 deletions autonomi/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl Client {
let _handle = sn_networking::target_arch::spawn(async move {
for addr in peers {
if let Err(err) = network_clone.dial(addr.clone()).await {
error!("Failed to dial addr={addr} with err: {err:?}");
eprintln!("addr={addr} Failed to dial: {err:?}");
};
}
Expand Down
82 changes: 65 additions & 17 deletions autonomi/src/client/registers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl Client {

/// Fetches a Register from the network.
pub async fn register_get(&self, address: RegisterAddress) -> Result<Register, RegisterError> {
info!("Fetching register at addr: {address}");
let network_address = NetworkAddress::from_register_address(address);
let key = network_address.to_record_key();

Expand All @@ -104,6 +105,7 @@ impl Client {
}
// manage forked register case
Err(NetworkError::GetRecordError(GetRecordError::SplitRecord { result_map })) => {
debug!("Forked register detected for {address:?} merging forks");
let mut registers: Vec<SignedRegister> = vec![];
for (_, (record, _)) in result_map {
registers.push(
Expand All @@ -119,13 +121,17 @@ impl Client {
});
register
}
Err(e) => Err(e)?,
Err(e) => {
error!("Failed to get register {address:?} from network: {e}");
Err(e)?
}
};

// Make sure the fetched record contains valid CRDT operations
register
.verify()
.map_err(|_| RegisterError::FailedVerification)?;
register.verify().map_err(|err| {
error!("Failed to verify register {address:?} with error: {err}");
RegisterError::FailedVerification
})?;

Ok(Register { inner: register })
}
Expand All @@ -142,21 +148,38 @@ impl Client {
let mut register = signed_register
.clone()
.register()
.expect("register to be valid")
.map_err(|err| {
error!(
"Failed to get register from signed register as it failed verification: {err}"
);
RegisterError::FailedVerification
})?
.clone();

info!("Updating register at addr: {}", register.address());

// Get all current branches
let children: BTreeSet<EntryHash> = register.read().into_iter().map(|(e, _)| e).collect();

// Write the new value to all branches
let (_, op) = register
.write(new_value.into(), &children, &owner)
.map_err(RegisterError::Write)?;
.map_err(|err| {
error!(
"Failed to write to register at addr: {} : {err}",
register.address()
);
RegisterError::Write(err)
})?;

// Apply the operation to the register
signed_register
.add_op(op.clone())
.map_err(RegisterError::Write)?;
signed_register.add_op(op.clone()).map_err(|err| {
error!(
"Failed to add op to register at addr: {} : {err}",
register.address()
);
RegisterError::Write(err)
})?;

// Prepare the record for network storage
let record = Record {
Expand All @@ -183,7 +206,15 @@ impl Client {
};

// Store the updated register on the network
self.network.put_record(record, &put_cfg).await?;
self.network
.put_record(record, &put_cfg)
.await
.inspect_err(|err| {
error!(
"Failed to put record - register {:?} to the network: {err}",
register.address()
)
})?;

Ok(())
}
Expand All @@ -194,6 +225,7 @@ impl Client {
name: String,
owner: RegisterSecretKey,
) -> Result<AttoTokens, RegisterError> {
info!("Getting cost for register with name: {name}");
// get register address
let pk = owner.public_key();
let name = XorName::from_content_parts(&[name.as_bytes()]);
Expand Down Expand Up @@ -256,6 +288,8 @@ impl Client {
let mut register = ClientRegister::new(pk, name, permissions);
let address = NetworkAddress::from_register_address(*register.address());

info!("Creating register at address: {address}");

let entries = register
.read()
.into_iter()
Expand All @@ -264,21 +298,29 @@ impl Client {

let _ = register.write(value.into(), &entries, &owner);
let reg_xor = register.address().xorname();
let (payment_proofs, _skipped) = self.pay(std::iter::once(reg_xor), wallet).await?;
debug!("Paying for register at address: {address}");
let (payment_proofs, _skipped) = self
.pay(std::iter::once(reg_xor), wallet)
.await
.inspect_err(|err| {
error!("Failed to pay for register at address: {address} : {err}")
})?;
let proof = if let Some(proof) = payment_proofs.get(&reg_xor) {
proof
} else {
// register was skipped, meaning it was already paid for
error!("Register at address: {address} was already paid for");
return Err(RegisterError::Network(NetworkError::RegisterAlreadyExists));
};

let payee = proof
.to_peer_id_payee()
.ok_or(RegisterError::InvalidQuote)?;
let signed_register = register
.clone()
.into_signed(&owner)
.map_err(RegisterError::CouldNotSign)?;
.ok_or(RegisterError::InvalidQuote)
.inspect_err(|err| error!("Failed to get payee from payment proof: {err}"))?;
let signed_register = register.clone().into_signed(&owner).map_err(|err| {
error!("Failed to sign register at address: {address} : {err}");
RegisterError::CouldNotSign(err)
})?;

let record = Record {
key: address.to_record_key(),
Expand Down Expand Up @@ -306,7 +348,13 @@ impl Client {
verification: Some((VerificationKind::Network, get_cfg)),
};

self.network.put_record(record, &put_cfg).await?;
debug!("Storing register at address {address} to the network");
self.network
.put_record(record, &put_cfg)
.await
.inspect_err(|err| {
error!("Failed to put record - register {address} to the network: {err}")
})?;

Ok(Register {
inner: signed_register,
Expand Down
Loading

0 comments on commit 52ed976

Please sign in to comment.