diff --git a/src/store/fs/tests.rs b/src/store/fs/tests.rs index 85540eb8..d01bae60 100644 --- a/src/store/fs/tests.rs +++ b/src/store/fs/tests.rs @@ -1,6 +1,7 @@ use std::io::Cursor; use bao_tree::ChunkRanges; +use bytes::BytesMut; use iroh_io::AsyncSliceReaderExt; use crate::{ @@ -8,9 +9,9 @@ use crate::{ bao_file::test_support::{ decode_response_into_batch, make_wire_data, random_test_data, simulate_remote, validate, }, - Map as _, MapEntryMut, MapMut, ReadableStore, Store as _, + Map as _, MapEntry, MapEntryMut, MapMut, ReadableStore, Store as _, ValidateProgress, }, - util::raw_outboard, + util::{progress::AsyncChannelProgressSender, raw_outboard}, IROH_BLOCK_SIZE, }; @@ -809,3 +810,34 @@ async fn actor_store_smoke() { db.sync().await.unwrap(); db.dump().await.unwrap(); } + +#[tokio::test] +async fn verifiable_stream_smoke() -> testresult::TestResult { + let db1 = crate::store::mem::Store::new(); + let db2 = crate::store::mem::Store::new(); + + const SIZE: usize = 16 * 1024 * 1024; + let data = random_test_data(SIZE); + let tag = db1.import_bytes(Bytes::from(data), BlobFormat::Raw).await?; + let mut buffer = BytesMut::with_capacity(SIZE + 1024 * 1024); + let entry = db1.get(tag.hash()).await?.expect("We just wrote this hash"); + + entry.write_verifiable_stream(0, &mut buffer).await?; + + db2.import_verifiable_stream(*tag.hash(), SIZE as u64, 0, buffer.freeze()) + .await?; + + let (tx, rx) = async_channel::bounded(128); + let handle = tokio::spawn(async move { + while let Ok(progress) = rx.recv().await { + if let ValidateProgress::Abort(err) = progress { + panic!("Got an error: {err}"); + } + } + }); + db2.validate(false, AsyncChannelProgressSender::new(tx).boxed()) + .await?; + handle.await?; + + Ok(()) +} diff --git a/src/store/traits.rs b/src/store/traits.rs index e7982ebb..1c6ade5d 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -3,13 +3,18 @@ use std::{collections::BTreeSet, future::Future, io, path::PathBuf, time::Durati pub use bao_tree; use bao_tree::{ - io::fsm::{BaoContentItem, Outboard}, + io::{ + fsm::{ + encode_ranges_validated, BaoContentItem, Outboard, ResponseDecoder, ResponseDecoderNext, + }, + DecodeError, + }, BaoTree, ChunkRanges, }; use bytes::Bytes; use futures_lite::{Stream, StreamExt}; use genawaiter::rc::{Co, Gen}; -use iroh_io::AsyncSliceReader; +use iroh_io::{AsyncSliceReader, AsyncStreamReader, AsyncStreamWriter}; pub use range_collections; use serde::{Deserialize, Serialize}; use tokio::io::AsyncRead; @@ -90,6 +95,31 @@ pub trait MapEntry: std::fmt::Debug + Clone + Send + Sync + 'static { fn outboard(&self) -> impl Future> + Send; /// A future that resolves to a reader that can be used to read the data fn data_reader(&self) -> impl Future> + Send; + + /// Encodes data and outboard into a [`AsyncStreamWriter`]. + /// + /// Data and outboard parts will be interleaved. + /// + /// `offset` is the byte offset in the blob to start the stream from. It will be rounded down to + /// the next chunk group. + /// + /// Returns immediately without error if `start` is equal or larger than the entry's size. + fn write_verifiable_stream<'a>( + &'a self, + offset: u64, + writer: impl AsyncStreamWriter + 'a, + ) -> impl Future> + 'a { + async move { + let size = self.size().value(); + if offset >= size { + return Ok(()); + } + let ranges = range_from_offset_and_length(offset, size - offset); + let (outboard, data) = tokio::try_join!(self.outboard(), self.data_reader())?; + encode_ranges_validated(data, outboard, &ranges, writer).await?; + Ok(()) + } + } } /// A generic map from hashes to bao blobs (blobs with bao outboards). @@ -341,6 +371,74 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug { self.import_stream(stream, format, progress) } + /// Import a blob from a verified stream, as emitted by [`MapEntry::write_verifiable_stream`]; + /// + /// `total_size` is the total size of the blob as reported by the remote. + /// `offset` is the byte offset in the blob where the stream starts. It will be rounded + /// to the next chunk group. + fn import_verifiable_stream<'a>( + &'a self, + hash: Hash, + total_size: u64, + offset: u64, + reader: impl AsyncStreamReader + 'a, + ) -> impl Future> + 'a { + async move { + if offset >= total_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "offset must not be greater than total_size", + )); + } + let entry = self.get_or_create(hash, total_size).await?; + let mut bw = entry.batch_writer().await?; + + let ranges = range_from_offset_and_length(offset, total_size - offset); + let mut decoder = ResponseDecoder::new( + hash.into(), + ranges, + BaoTree::new(total_size, IROH_BLOCK_SIZE), + reader, + ); + let size = decoder.tree().size(); + let mut buf = Vec::new(); + let is_complete = loop { + decoder = match decoder.next().await { + ResponseDecoderNext::More((decoder, item)) => { + let item = match item { + Err(DecodeError::LeafNotFound(_) | DecodeError::ParentNotFound(_)) => { + break false + } + Err(err) => return Err(err.into()), + Ok(item) => item, + }; + match &item { + BaoContentItem::Parent(_) => { + buf.push(item); + } + BaoContentItem::Leaf(_) => { + buf.push(item); + let batch = std::mem::take(&mut buf); + bw.write_batch(size, batch).await?; + } + } + decoder + } + ResponseDecoderNext::Done(_reader) => { + debug_assert!(buf.is_empty(), "last node of bao tree must be leaf node"); + break true; + } + }; + }; + bw.sync().await?; + drop(bw); + if is_complete { + self.insert_complete(entry).await?; + } + Ok(()) + } + } + /// Set a tag fn set_tag( &self, @@ -386,6 +484,11 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug { } } +fn range_from_offset_and_length(offset: u64, length: u64) -> bao_tree::ChunkRanges { + let ranges = bao_tree::ByteRanges::from(offset..(offset + length)); + bao_tree::io::round_up_to_chunks(&ranges) +} + async fn validate_impl( store: &impl Store, repair: bool,