Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement tracing & thiserror error types #29

Merged
merged 3 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions car-mirror/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ proptest = { version = "1.1", optional = true }
roaring-graphs = { version = "0.12", optional = true }
serde = "1.0.183"
serde_ipld_dagcbor = "0.4.0"
thiserror = "1.0.47"
tracing = "0.1"
tracing-subscriber = "0.3"
wnfs-common = "0.1.23"
Expand Down
124 changes: 107 additions & 17 deletions car-mirror/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use anyhow::{anyhow, bail, Result};
#![allow(unknown_lints)] // Because the `instrument` macro contains some `#[allow]`s that rust 1.66 doesn't know yet.

use anyhow::anyhow;
use bytes::Bytes;
use deterministic_bloom::runtime_size::BloomFilter;
use futures::TryStreamExt;
use iroh_car::{CarHeader, CarReader, CarWriter};
use libipld::{Ipld, IpldCodec};
use libipld_core::{cid::Cid, codec::References};
use std::io::Cursor;
use tracing::{debug, instrument, trace, warn};
use wnfs_common::BlockStore;

use crate::{
dag_walk::DagWalk,
error::Error,
incremental_verification::{BlockState, IncrementalDagVerification},
messages::{Bloom, PullRequest, PushResponse},
};
Expand Down Expand Up @@ -63,12 +67,13 @@ pub struct CarFile {
///
/// It returns a `CarFile` of (a subset) of all blocks below `root`, that
/// are thought to be missing on the receiving end.
#[instrument(skip(config, store))]
pub async fn block_send(
root: Cid,
last_state: Option<ReceiverState>,
config: &Config,
store: &impl BlockStore,
) -> Result<CarFile> {
) -> Result<CarFile, Error> {
let ReceiverState {
ref missing_subgraph_roots,
have_cids_bloom,
Expand All @@ -86,6 +91,30 @@ pub async fn block_send(
.try_collect()
.await?;

if subgraph_roots.len() != missing_subgraph_roots.len() {
let unrelated_roots = missing_subgraph_roots
.iter()
.filter(|cid| !subgraph_roots.contains(cid))
.map(|cid| cid.to_string())
.collect::<Vec<_>>()
.join(", ");

warn!(
unrelated_roots = %unrelated_roots,
"got asked for DAG-unrelated blocks"
);
}

if let Some(bloom) = &have_cids_bloom {
debug!(
size_bits = bloom.as_bytes().len() * 8,
hash_count = bloom.hash_count(),
ones_count = bloom.count_ones(),
estimated_fpr = bloom.current_false_positive_rate(),
"received 'have cids' bloom",
);
}

let bloom = have_cids_bloom.unwrap_or_else(|| BloomFilter::new_with(1, Box::new([0]))); // An empty bloom that contains nothing

let mut writer = CarWriter::new(
Expand All @@ -102,16 +131,35 @@ pub async fn block_send(
Vec::new(),
);

writer.write_header().await?;
writer
.write_header()
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?;

let mut block_bytes = 0;
let mut dag_walk = DagWalk::breadth_first(subgraph_roots.clone());
while let Some((cid, block)) = dag_walk.next(store).await? {
if bloom.contains(&cid.to_bytes()) && !subgraph_roots.contains(&cid) {
debug!(
cid = %cid,
bloom_contains = bloom.contains(&cid.to_bytes()),
subgraph_roots_contains = subgraph_roots.contains(&cid),
"skipped writing block"
);
continue;
}

writer.write(cid, &block).await?;
debug!(
cid = %cid,
num_bytes = block.len(),
frontier_size = dag_walk.frontier.len(),
"writing block to CAR",
);

writer
.write(cid, &block)
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?;

// TODO(matheus23): Count the actual bytes sent?
// At the moment, this is a rough estimate. iroh-car could be improved to return the written bytes.
Expand All @@ -122,7 +170,11 @@ pub async fn block_send(
}

Ok(CarFile {
bytes: writer.finish().await?.into(),
bytes: writer
.finish()
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?
.into(),
})
}

Expand All @@ -134,33 +186,49 @@ pub async fn block_send(
/// It takes a `CarFile`, verifies that its contents are related to the
/// `root` and returns some information to help the block sending side
/// figure out what blocks to send next.
#[instrument(skip(last_car, config, store), fields(car_bytes = last_car.as_ref().map(|car| car.bytes.len())))]
pub async fn block_receive(
root: Cid,
last_car: Option<CarFile>,
config: &Config,
store: &impl BlockStore,
) -> Result<ReceiverState> {
) -> Result<ReceiverState, Error> {
let mut dag_verification = IncrementalDagVerification::new([root], store).await?;

if let Some(car) = last_car {
let mut reader = CarReader::new(Cursor::new(car.bytes)).await?;
let mut reader = CarReader::new(Cursor::new(car.bytes))
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?;
let mut block_bytes = 0;

while let Some((cid, vec)) = reader.next_block().await? {
while let Some((cid, vec)) = reader
.next_block()
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?
{
let block = Bytes::from(vec);

debug!(
cid = %cid,
num_bytes = block.len(),
"reading block from CAR",
);

block_bytes += block.len();
if block_bytes > config.receive_maximum {
bail!(
"Received more than {} bytes ({block_bytes}), aborting request.",
config.receive_maximum
);
return Err(Error::TooManyBytes {
block_bytes,
receive_maximum: config.receive_maximum,
});
}

match dag_verification.block_state(cid) {
BlockState::Have => continue,
BlockState::Unexpected => {
eprintln!("Warn: Received block {cid} out of order, may be due to bloom false positive.");
trace!(
cid = %cid,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trace! is the lowest level. Should this be a warn?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's trace intentionally.
I used to think of this as a warning - it's one of the things that if something goes wrong, this may happen more often.
But it's also something that just happens during normal execution, so more like some info.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok. yeah, from the comment it made it sound worse than it is :).

"received block out of order (possibly due to bloom false positive)"
);
break;
}
BlockState::Want => {
Expand Down Expand Up @@ -188,6 +256,14 @@ pub async fn block_receive(
});
}

if missing_subgraph_roots.is_empty() {
// We're done. No need to compute a bloom.
return Ok(ReceiverState {
missing_subgraph_roots,
have_cids_bloom: None,
});
}

let mut bloom =
BloomFilter::new_from_fpr_po2(bloom_capacity, (config.bloom_fpr)(bloom_capacity));

Expand All @@ -196,6 +272,15 @@ pub async fn block_receive(
.iter()
.for_each(|cid| bloom.insert(&cid.to_bytes()));

debug!(
inserted_elements = bloom_capacity,
size_bits = bloom.as_bytes().len() * 8,
hash_count = bloom.hash_count(),
ones_count = bloom.count_ones(),
estimated_fpr = bloom.current_false_positive_rate(),
"built 'have cids' bloom",
);

Ok(ReceiverState {
missing_subgraph_roots,
have_cids_bloom: Some(bloom),
Expand All @@ -207,13 +292,18 @@ pub async fn block_receive(
/// This will error out if
/// - the codec is not supported
/// - the block can't be parsed.
pub fn references<E: Extend<Cid>>(cid: Cid, block: impl AsRef<[u8]>, mut refs: E) -> Result<E> {
pub fn references<E: Extend<Cid>>(
cid: Cid,
block: impl AsRef<[u8]>,
mut refs: E,
) -> Result<E, Error> {
let codec: IpldCodec = cid
.codec()
.try_into()
.map_err(|_| anyhow!("Unsupported codec in Cid: {cid}"))?;
.map_err(|_| Error::UnsupportedCodec { cid })?;

<Ipld as References<IpldCodec>>::references(codec, &mut Cursor::new(block), &mut refs)?;
<Ipld as References<IpldCodec>>::references(codec, &mut Cursor::new(block), &mut refs)
.map_err(Error::ParsingError)?;
Ok(refs)
}

Expand Down Expand Up @@ -310,7 +400,7 @@ impl Default for Config {
send_minimum: 128 * 1024, // 128KiB
receive_maximum: 512 * 1024, // 512KiB
max_roots_per_round: 1000, // max. ~41KB of CIDs
bloom_fpr: |num_of_elems| 0.1 / num_of_elems as f64,
bloom_fpr: |num_of_elems| f64::min(0.001, 0.1 / num_of_elems as f64),
}
}
}
15 changes: 9 additions & 6 deletions car-mirror/src/dag_walk.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::common::references;
use anyhow::Result;
use crate::{common::references, error::Error};
use bytes::Bytes;
use futures::{stream::try_unfold, Stream};
use libipld_core::cid::Cid;
Expand Down Expand Up @@ -54,7 +53,7 @@ impl DagWalk {
/// Return the next node in the traversal.
///
/// Returns `None` if no nodes are left to be visited.
pub async fn next(&mut self, store: &impl BlockStore) -> Result<Option<(Cid, Bytes)>> {
pub async fn next(&mut self, store: &impl BlockStore) -> Result<Option<(Cid, Bytes)>, Error> {
let cid = loop {
let popped = if self.breadth_first {
self.frontier.pop_back()
Expand All @@ -75,7 +74,10 @@ impl DagWalk {
// TODO: Two opportunities for performance improvement:
// - skip Raw CIDs. They can't have further links (but needs adjustment to this function's return type)
// - run multiple `get_block` calls concurrently
let block = store.get_block(&cid).await?;
let block = store
.get_block(&cid)
.await
.map_err(Error::BlockStoreError)?;
for ref_cid in references(cid, &block, Vec::new())? {
if !self.visited.contains(&ref_cid) {
self.frontier.push_front(ref_cid);
Expand All @@ -89,7 +91,7 @@ impl DagWalk {
pub fn stream(
self,
store: &impl BlockStore,
) -> impl Stream<Item = Result<(Cid, Bytes)>> + Unpin + '_ {
) -> impl Stream<Item = Result<(Cid, Bytes), Error>> + Unpin + '_ {
Box::pin(try_unfold(self, move |mut this| async move {
let maybe_block = this.next(store).await?;
Ok(maybe_block.map(|b| (b, this)))
Expand All @@ -110,7 +112,7 @@ impl DagWalk {
}

/// Skip a node from the traversal for now.
pub fn skip_walking(&mut self, block: (Cid, Bytes)) -> Result<()> {
pub fn skip_walking(&mut self, block: (Cid, Bytes)) -> Result<(), Error> {
let (cid, bytes) = block;
let refs = references(cid, bytes, HashSet::new())?;
self.visited.insert(cid);
Expand All @@ -124,6 +126,7 @@ impl DagWalk {
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Result;
use futures::TryStreamExt;
use libipld::Ipld;
use wnfs_common::MemoryBlockStore;
Expand Down
Loading
Loading