Skip to content

Commit 5d30c2a

Browse files
committed
use callback lock to watch internal state
1 parent 042e5b1 commit 5d30c2a

File tree

1 file changed

+62
-27
lines changed

1 file changed

+62
-27
lines changed

src/store/bao_file.rs

+62-27
Original file line numberDiff line numberDiff line change
@@ -340,12 +340,17 @@ impl<T> BaoFileHandleWeak<T> {
340340
}
341341
}
342342

343+
/// a type alias which represents the callback which is executed after
344+
/// the write guard is dropped
345+
type AfterLockWriteCb<T> = Box<dyn Fn(&BaoFileStorage<T>) + Send + Sync + 'static>;
346+
343347
/// The inner part of a bao file handle.
344348
#[derive(Debug)]
345349
pub struct BaoFileHandleInner<T> {
346-
pub(crate) storage: tokio::sync::RwLock<BaoFileStorage<T>>,
350+
pub(crate) storage: CallbackLock<BaoFileStorage<T>, AfterLockWriteCb<T>>,
347351
config: Arc<BaoFileConfig>,
348352
hash: Hash,
353+
rx: tokio::sync::watch::Receiver<StorageMeta>,
349354
}
350355

351356
/// A cheaply cloneable handle to a bao file, including the hash and the configuration.
@@ -511,21 +516,55 @@ enum HandleChange {
511516
// later: size verified
512517
}
513518

519+
/// struct which stores simple metadata about the [BaoFileHandle] in a way that is
520+
/// accessible in synchronous function calls
521+
#[derive(Debug)]
522+
struct StorageMeta {
523+
complete: bool,
524+
size: Result<u64, io::Error>,
525+
}
526+
527+
impl StorageMeta {
528+
fn new<T: bao_tree::io::sync::ReadAt>(storage: &BaoFileStorage<T>) -> Self {
529+
let size = match storage {
530+
BaoFileStorage::Complete(mem) => Ok(mem.data_size()),
531+
BaoFileStorage::IncompleteMem(mem) => Ok(mem.current_size()),
532+
BaoFileStorage::IncompleteFile(file) => file.current_size(),
533+
};
534+
StorageMeta {
535+
complete: matches!(storage, BaoFileStorage::Complete(_)),
536+
size,
537+
}
538+
}
539+
}
540+
514541
impl<T> BaoFileHandle<T>
515542
where
516543
T: bao_tree::io::sync::ReadAt,
517544
{
545+
/// internal helper function to initialize a new instance of self
546+
fn new_inner(storage: BaoFileStorage<T>, config: Arc<BaoFileConfig>, hash: Hash) -> Self {
547+
let (tx, rx) = tokio::sync::watch::channel(StorageMeta::new(&storage));
548+
Self(Arc::new(BaoFileHandleInner {
549+
storage: CallbackLock::new(
550+
storage,
551+
Box::new(move |storage: &BaoFileStorage<T>| {
552+
let _ = tx.send(StorageMeta::new(storage));
553+
}),
554+
),
555+
config,
556+
hash,
557+
rx,
558+
}))
559+
}
560+
518561
/// Create a new bao file handle.
519562
///
520563
/// This will create a new file handle with an empty memory storage.
521564
/// Since there are very likely to be many of these, we use an arc rwlock
522565
pub fn incomplete_mem(config: Arc<BaoFileConfig>, hash: Hash) -> Self {
523566
let storage = BaoFileStorage::incomplete_mem();
524-
Self(Arc::new(BaoFileHandleInner {
525-
storage: tokio::sync::RwLock::new(storage),
526-
config,
527-
hash,
528-
}))
567+
Self::new_inner(storage, config, hash)
529568
}
530569

531570
/// Create a new bao file handle with a partial file.
@@ -536,11 +575,7 @@ where
536575
outboard: create_read_write(&paths.outboard)?,
537576
sizes: create_read_write(&paths.sizes)?,
538577
});
539-
Ok(Self(Arc::new(BaoFileHandleInner {
540-
storage: tokio::sync::RwLock::new(storage),
541-
config,
542-
hash,
543-
})))
578+
Ok(Self::new_inner(storage, config, hash))
544579
}
545580

546581
/// Create a new complete bao file handle.
@@ -551,11 +586,7 @@ where
551586
outboard: MemOrFile<Bytes, FileAndSize<T>>,
552587
) -> Self {
553588
let storage = BaoFileStorage::Complete(CompleteStorage { data, outboard });
554-
Self(Arc::new(BaoFileHandleInner {
555-
storage: tokio::sync::RwLock::new(storage),
556-
config,
557-
hash,
558-
}))
589+
Self::new_inner(storage, config, hash)
559590
}
560591

561592
/// Transform the storage in place. If the transform fails, the storage will
@@ -573,10 +604,7 @@ where
573604

574605
/// True if the file is complete.
575606
pub fn is_complete(&self) -> bool {
576-
matches!(
577-
self.storage.try_read().unwrap().deref(),
578-
BaoFileStorage::Complete(_)
579-
)
607+
self.rx.borrow().deref().complete
580608
}
581609

582610
/// An AsyncSliceReader for the data file.
@@ -596,18 +624,25 @@ where
596624
}
597625

598626
/// The most precise known total size of the data file.
599-
pub fn current_size(&self) -> io::Result<u64> {
600-
match self.storage.try_read().unwrap().deref() {
601-
BaoFileStorage::Complete(mem) => Ok(mem.data_size()),
602-
BaoFileStorage::IncompleteMem(mem) => Ok(mem.current_size()),
603-
BaoFileStorage::IncompleteFile(file) => file.current_size(),
604-
}
627+
pub fn current_size(&self) -> Result<u64, io::ErrorKind> {
628+
self.rx
629+
.borrow()
630+
.size
631+
.as_ref()
632+
// NB: we return the io::ErrorKind here
633+
// because io::Error is !Clone
634+
.map_err(|e| e.kind())
635+
.copied()
605636
}
606637

607638
/// The outboard for the file.
608639
pub fn outboard(&self) -> io::Result<PreOrderOutboard<OutboardReader<T>>> {
609640
let root = self.hash.into();
610-
let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE);
641+
let tree = BaoTree::new(
642+
self.current_size()
643+
.map_err(|kind| io::Error::new(kind, "an io error has occurred"))?,
644+
IROH_BLOCK_SIZE,
645+
);
611646
let outboard = self.outboard_reader();
612647
Ok(PreOrderOutboard {
613648
root,

0 commit comments

Comments
 (0)