Skip to content

Commit a685f29

Browse files
committed
fix all tests
1 parent dff836e commit a685f29

File tree

2 files changed

+69
-76
lines changed

2 files changed

+69
-76
lines changed

src/store/bao_file.rs

+18-34
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::{
1414
io,
1515
ops::{Deref, DerefMut},
1616
path::{Path, PathBuf},
17-
sync::{Arc, RwLock, Weak},
17+
sync::{Arc, Weak},
1818
};
1919

2020
use bao_tree::{
@@ -343,7 +343,7 @@ impl<T> BaoFileHandleWeak<T> {
343343
/// The inner part of a bao file handle.
344344
#[derive(Debug)]
345345
pub struct BaoFileHandleInner<T> {
346-
pub(crate) storage: RwLock<BaoFileStorage<T>>,
346+
pub(crate) storage: tokio::sync::RwLock<BaoFileStorage<T>>,
347347
config: Arc<BaoFileConfig>,
348348
hash: Hash,
349349
}
@@ -432,15 +432,9 @@ where
432432
return res;
433433
}
434434
};
435-
// otherwise, we have to spawn a task.
436-
let (handle, res) = tokio::task::spawn_blocking(move || {
437-
let storage = handle.storage.read().unwrap();
438-
let res = f(storage.deref());
439-
drop(storage);
440-
(handle, res)
441-
})
442-
.await
443-
.expect("spawn_blocking failed");
435+
let storage_guard = handle.storage.read().await;
436+
let res = f(storage_guard.deref());
437+
drop(storage_guard);
444438
*opt = Some(handle);
445439
res
446440
}
@@ -528,7 +522,7 @@ where
528522
pub fn incomplete_mem(config: Arc<BaoFileConfig>, hash: Hash) -> Self {
529523
let storage = BaoFileStorage::incomplete_mem();
530524
Self(Arc::new(BaoFileHandleInner {
531-
storage: RwLock::new(storage),
525+
storage: tokio::sync::RwLock::new(storage),
532526
config,
533527
hash,
534528
}))
@@ -543,7 +537,7 @@ where
543537
sizes: create_read_write(&paths.sizes)?,
544538
});
545539
Ok(Self(Arc::new(BaoFileHandleInner {
546-
storage: RwLock::new(storage),
540+
storage: tokio::sync::RwLock::new(storage),
547541
config,
548542
hash,
549543
})))
@@ -558,7 +552,7 @@ where
558552
) -> Self {
559553
let storage = BaoFileStorage::Complete(CompleteStorage { data, outboard });
560554
Self(Arc::new(BaoFileHandleInner {
561-
storage: RwLock::new(storage),
555+
storage: tokio::sync::RwLock::new(storage),
562556
config,
563557
hash,
564558
}))
@@ -567,20 +561,20 @@ where
567561
/// Transform the storage in place. If the transform fails, the storage will
568562
/// be an immutable empty storage.
569563
#[cfg(feature = "fs-store")]
570-
pub(crate) fn transform(
564+
pub(crate) async fn transform(
571565
&self,
572-
f: impl FnOnce(BaoFileStorage<T>) -> io::Result<BaoFileStorage<T>>,
566+
f: impl AsyncFnOnce(BaoFileStorage<T>) -> io::Result<BaoFileStorage<T>>,
573567
) -> io::Result<()> {
574-
let mut lock = self.storage.write().unwrap();
568+
let mut lock = self.storage.write().await;
575569
let storage = lock.take();
576-
*lock = f(storage)?;
570+
*lock = f(storage).await?;
577571
Ok(())
578572
}
579573

580574
/// True if the file is complete.
581575
pub fn is_complete(&self) -> bool {
582576
matches!(
583-
self.storage.read().unwrap().deref(),
577+
self.storage.try_read().unwrap().deref(),
584578
BaoFileStorage::Complete(_)
585579
)
586580
}
@@ -603,7 +597,7 @@ where
603597

604598
/// The most precise known total size of the data file.
605599
pub fn current_size(&self) -> io::Result<u64> {
606-
match self.storage.read().unwrap().deref() {
600+
match self.storage.try_read().unwrap().deref() {
607601
BaoFileStorage::Complete(mem) => Ok(mem.data_size()),
608602
BaoFileStorage::IncompleteMem(mem) => Ok(mem.current_size()),
609603
BaoFileStorage::IncompleteFile(file) => file.current_size(),
@@ -633,8 +627,8 @@ where
633627
}
634628

635629
/// This is the synchronous impl for writing a batch.
636-
fn write_batch(&self, size: u64, batch: &[BaoContentItem]) -> io::Result<HandleChange> {
637-
let mut storage = self.storage.write().unwrap();
630+
async fn write_batch(&self, size: u64, batch: &[BaoContentItem]) -> io::Result<HandleChange> {
631+
let mut storage = self.storage.write().await;
638632
match storage.deref_mut() {
639633
BaoFileStorage::IncompleteMem(mem) => {
640634
// check if we need to switch to file mode, otherwise write to memory
@@ -730,12 +724,7 @@ where
730724
let Some(handle) = self.0.take() else {
731725
return Err(io::Error::new(io::ErrorKind::Other, "deferred batch busy"));
732726
};
733-
let (handle, change) = tokio::task::spawn_blocking(move || {
734-
let change = handle.write_batch(size, &batch);
735-
(handle, change)
736-
})
737-
.await
738-
.expect("spawn_blocking failed");
727+
let change = handle.write_batch(size, &batch).await;
739728
match change? {
740729
HandleChange::None => {}
741730
HandleChange::MemToFile => {
@@ -752,12 +741,7 @@ where
752741
let Some(handle) = self.0.take() else {
753742
return Err(io::Error::new(io::ErrorKind::Other, "deferred batch busy"));
754743
};
755-
let (handle, res) = tokio::task::spawn_blocking(move || {
756-
let res = handle.storage.write().unwrap().sync_all();
757-
(handle, res)
758-
})
759-
.await
760-
.expect("spawn_blocking failed");
744+
let res = handle.storage.write().await.sync_all();
761745
self.0 = Some(handle);
762746
res
763747
}

src/store/fs.rs

+51-42
Original file line numberDiff line numberDiff line change
@@ -1109,7 +1109,7 @@ where
11091109
.into(),
11101110
)
11111111
})?;
1112-
block_for(self.fs.create_dir_all(parent))?;
1112+
self.fs.create_dir_all(parent).await?;
11131113
let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash));
11141114
let (tx, rx) = oneshot::channel();
11151115
self.tx
@@ -2007,7 +2007,11 @@ where
20072007
Ok(())
20082008
}
20092009

2010-
fn import(&mut self, tables: &mut Tables, cmd: Import) -> ActorResult<(TempTag, u64)> {
2010+
async fn import(
2011+
&mut self,
2012+
tables: &mut Tables<'_>,
2013+
cmd: Import,
2014+
) -> ActorResult<(TempTag, u64)> {
20112015
let Import {
20122016
content_id,
20132017
source: file,
@@ -2032,7 +2036,9 @@ where
20322036
external_path.display()
20332037
);
20342038
let data = Bytes::from(
2035-
block_for(self.fs.read(&external_path))
2039+
self.fs
2040+
.read(&external_path)
2041+
.await
20362042
.map_err(|e| ActorError::Io(e.into()))?,
20372043
);
20382044
DataLocation::Inline(data)
@@ -2424,41 +2430,45 @@ where
24242430
Ok(())
24252431
}
24262432

2427-
fn on_complete(
2433+
async fn on_complete(
24282434
&mut self,
2429-
tables: &mut Tables,
2435+
tables: &mut Tables<'_>,
24302436
entry: BaoFileHandle<T::File>,
24312437
) -> ActorResult<()> {
24322438
let hash = entry.hash();
24332439
let mut info = None;
24342440
tracing::trace!("on_complete({})", hash.to_hex());
2435-
entry.transform(|state| {
2436-
tracing::trace!("on_complete transform {:?}", state);
2437-
let entry = match complete_storage(
2438-
state,
2439-
&hash,
2440-
&self.options.path,
2441-
&self.options.inline,
2442-
tables.delete_after_commit,
2443-
self.fs.clone(),
2444-
)? {
2445-
Ok(entry) => {
2446-
// store the info so we can insert it into the db later
2447-
info = Some((
2448-
entry.data_size(),
2449-
entry.data.mem().cloned(),
2450-
entry.outboard_size(),
2451-
entry.outboard.mem().cloned(),
2452-
));
2453-
entry
2454-
}
2455-
Err(entry) => {
2456-
// the entry was already complete, nothing to do
2457-
entry
2458-
}
2459-
};
2460-
Ok(BaoFileStorage::Complete(entry))
2461-
})?;
2441+
entry
2442+
.transform(async |state| {
2443+
tracing::trace!("on_complete transform {:?}", state);
2444+
let entry = match complete_storage(
2445+
state,
2446+
&hash,
2447+
&self.options.path,
2448+
&self.options.inline,
2449+
tables.delete_after_commit,
2450+
self.fs.clone(),
2451+
)
2452+
.await?
2453+
{
2454+
Ok(entry) => {
2455+
// store the info so we can insert it into the db later
2456+
info = Some((
2457+
entry.data_size(),
2458+
entry.data.mem().cloned(),
2459+
entry.outboard_size(),
2460+
entry.outboard.mem().cloned(),
2461+
));
2462+
entry
2463+
}
2464+
Err(entry) => {
2465+
// the entry was already complete, nothing to do
2466+
entry
2467+
}
2468+
};
2469+
Ok(BaoFileStorage::Complete(entry))
2470+
})
2471+
.await?;
24622472
if let Some((data_size, data, outboard_size, outboard)) = info {
24632473
let data_location = if data.is_some() {
24642474
DataLocation::Inline(())
@@ -2588,7 +2598,7 @@ where
25882598
) -> ActorResult<std::result::Result<(), ActorMessage<T::File>>> {
25892599
match msg {
25902600
ActorMessage::Import { cmd, tx } => {
2591-
let res = self.import(tables, cmd);
2601+
let res = self.import(tables, cmd).await;
25922602
tx.send(res).ok();
25932603
}
25942604
ActorMessage::SetTag { tag, value, tx } => {
@@ -2616,7 +2626,7 @@ where
26162626
tx.send(res).ok();
26172627
}
26182628
ActorMessage::OnComplete { handle } => {
2619-
let res = self.on_complete(tables, handle);
2629+
let res = self.on_complete(tables, handle).await;
26202630
res.ok();
26212631
}
26222632
ActorMessage::Export { cmd, tx } => {
@@ -2798,7 +2808,7 @@ async fn load_outboard<T: Persistence>(
27982808
}
27992809

28002810
/// Take a possibly incomplete storage and turn it into complete
2801-
fn complete_storage<T>(
2811+
async fn complete_storage<T>(
28022812
storage: BaoFileStorage<T::File>,
28032813
hash: &Hash,
28042814
path_options: &PathOptions,
@@ -2865,13 +2875,12 @@ where
28652875
)
28662876
}
28672877
MemOrFile::File(data) => MemOrFile::File(
2868-
block_for(
2869-
FileAndSize {
2870-
file: data,
2871-
size: data_size,
2872-
}
2873-
.map_async(move |f| fs_2.convert_std_file(f)),
2874-
)
2878+
FileAndSize {
2879+
file: data,
2880+
size: data_size,
2881+
}
2882+
.map_async(move |f| fs_2.convert_std_file(f))
2883+
.await
28752884
.transpose()
28762885
.map_err(|e| ActorError::Io(e.into()))?,
28772886
),

0 commit comments

Comments
 (0)