Skip to content

Commit 802f9f7

Browse files
committed
fix all tests
1 parent 03af8b8 commit 802f9f7

File tree

2 files changed

+69
-76
lines changed

2 files changed

+69
-76
lines changed

src/store/bao_file.rs

+18-34
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::{
1414
io,
1515
ops::{Deref, DerefMut},
1616
path::{Path, PathBuf},
17-
sync::{Arc, RwLock, Weak},
17+
sync::{Arc, Weak},
1818
};
1919

2020
use bao_tree::{
@@ -343,7 +343,7 @@ impl<T> BaoFileHandleWeak<T> {
343343
/// The inner part of a bao file handle.
344344
#[derive(Debug)]
345345
pub struct BaoFileHandleInner<T> {
346-
pub(crate) storage: RwLock<BaoFileStorage<T>>,
346+
pub(crate) storage: tokio::sync::RwLock<BaoFileStorage<T>>,
347347
config: Arc<BaoFileConfig>,
348348
hash: Hash,
349349
}
@@ -432,15 +432,9 @@ where
432432
return res;
433433
}
434434
};
435-
// otherwise, we have to spawn a task.
436-
let (handle, res) = tokio::task::spawn_blocking(move || {
437-
let storage = handle.storage.read().unwrap();
438-
let res = f(storage.deref());
439-
drop(storage);
440-
(handle, res)
441-
})
442-
.await
443-
.expect("spawn_blocking failed");
435+
let storage_guard = handle.storage.read().await;
436+
let res = f(storage_guard.deref());
437+
drop(storage_guard);
444438
*opt = Some(handle);
445439
res
446440
}
@@ -528,7 +522,7 @@ where
528522
pub fn incomplete_mem(config: Arc<BaoFileConfig>, hash: Hash) -> Self {
529523
let storage = BaoFileStorage::incomplete_mem();
530524
Self(Arc::new(BaoFileHandleInner {
531-
storage: RwLock::new(storage),
525+
storage: tokio::sync::RwLock::new(storage),
532526
config,
533527
hash,
534528
}))
@@ -543,7 +537,7 @@ where
543537
sizes: create_read_write(&paths.sizes)?,
544538
});
545539
Ok(Self(Arc::new(BaoFileHandleInner {
546-
storage: RwLock::new(storage),
540+
storage: tokio::sync::RwLock::new(storage),
547541
config,
548542
hash,
549543
})))
@@ -558,7 +552,7 @@ where
558552
) -> Self {
559553
let storage = BaoFileStorage::Complete(CompleteStorage { data, outboard });
560554
Self(Arc::new(BaoFileHandleInner {
561-
storage: RwLock::new(storage),
555+
storage: tokio::sync::RwLock::new(storage),
562556
config,
563557
hash,
564558
}))
@@ -567,20 +561,20 @@ where
567561
/// Transform the storage in place. If the transform fails, the storage will
568562
/// be an immutable empty storage.
569563
#[cfg(feature = "fs-store")]
570-
pub(crate) fn transform(
564+
pub(crate) async fn transform(
571565
&self,
572-
f: impl FnOnce(BaoFileStorage<T>) -> io::Result<BaoFileStorage<T>>,
566+
f: impl AsyncFnOnce(BaoFileStorage<T>) -> io::Result<BaoFileStorage<T>>,
573567
) -> io::Result<()> {
574-
let mut lock = self.storage.write().unwrap();
568+
let mut lock = self.storage.write().await;
575569
let storage = lock.take();
576-
*lock = f(storage)?;
570+
*lock = f(storage).await?;
577571
Ok(())
578572
}
579573

580574
/// True if the file is complete.
581575
pub fn is_complete(&self) -> bool {
582576
matches!(
583-
self.storage.read().unwrap().deref(),
577+
self.storage.try_read().unwrap().deref(),
584578
BaoFileStorage::Complete(_)
585579
)
586580
}
@@ -603,7 +597,7 @@ where
603597

604598
/// The most precise known total size of the data file.
605599
pub fn current_size(&self) -> io::Result<u64> {
606-
match self.storage.read().unwrap().deref() {
600+
match self.storage.try_read().unwrap().deref() {
607601
BaoFileStorage::Complete(mem) => Ok(mem.data_size()),
608602
BaoFileStorage::IncompleteMem(mem) => Ok(mem.current_size()),
609603
BaoFileStorage::IncompleteFile(file) => file.current_size(),
@@ -633,8 +627,8 @@ where
633627
}
634628

635629
/// This is the synchronous impl for writing a batch.
636-
fn write_batch(&self, size: u64, batch: &[BaoContentItem]) -> io::Result<HandleChange> {
637-
let mut storage = self.storage.write().unwrap();
630+
async fn write_batch(&self, size: u64, batch: &[BaoContentItem]) -> io::Result<HandleChange> {
631+
let mut storage = self.storage.write().await;
638632
match storage.deref_mut() {
639633
BaoFileStorage::IncompleteMem(mem) => {
640634
// check if we need to switch to file mode, otherwise write to memory
@@ -730,12 +724,7 @@ where
730724
let Some(handle) = self.0.take() else {
731725
return Err(io::Error::new(io::ErrorKind::Other, "deferred batch busy"));
732726
};
733-
let (handle, change) = tokio::task::spawn_blocking(move || {
734-
let change = handle.write_batch(size, &batch);
735-
(handle, change)
736-
})
737-
.await
738-
.expect("spawn_blocking failed");
727+
let change = handle.write_batch(size, &batch).await;
739728
match change? {
740729
HandleChange::None => {}
741730
HandleChange::MemToFile => {
@@ -752,12 +741,7 @@ where
752741
let Some(handle) = self.0.take() else {
753742
return Err(io::Error::new(io::ErrorKind::Other, "deferred batch busy"));
754743
};
755-
let (handle, res) = tokio::task::spawn_blocking(move || {
756-
let res = handle.storage.write().unwrap().sync_all();
757-
(handle, res)
758-
})
759-
.await
760-
.expect("spawn_blocking failed");
744+
let res = handle.storage.write().await.sync_all();
761745
self.0 = Some(handle);
762746
res
763747
}

src/store/fs.rs

+51-42
Original file line numberDiff line numberDiff line change
@@ -1105,7 +1105,7 @@ where
11051105
.into(),
11061106
)
11071107
})?;
1108-
block_for(self.fs.create_dir_all(parent))?;
1108+
self.fs.create_dir_all(parent).await?;
11091109
let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash));
11101110
let (tx, rx) = oneshot::channel();
11111111
self.tx
@@ -2006,7 +2006,11 @@ where
20062006
Ok(())
20072007
}
20082008

2009-
fn import(&mut self, tables: &mut Tables, cmd: Import) -> ActorResult<(TempTag, u64)> {
2009+
async fn import(
2010+
&mut self,
2011+
tables: &mut Tables<'_>,
2012+
cmd: Import,
2013+
) -> ActorResult<(TempTag, u64)> {
20102014
let Import {
20112015
content_id,
20122016
source: file,
@@ -2031,7 +2035,9 @@ where
20312035
external_path.display()
20322036
);
20332037
let data = Bytes::from(
2034-
block_for(self.fs.read(&external_path))
2038+
self.fs
2039+
.read(&external_path)
2040+
.await
20352041
.map_err(|e| ActorError::Io(e.into()))?,
20362042
);
20372043
DataLocation::Inline(data)
@@ -2423,41 +2429,45 @@ where
24232429
Ok(())
24242430
}
24252431

2426-
fn on_complete(
2432+
async fn on_complete(
24272433
&mut self,
2428-
tables: &mut Tables,
2434+
tables: &mut Tables<'_>,
24292435
entry: BaoFileHandle<T::File>,
24302436
) -> ActorResult<()> {
24312437
let hash = entry.hash();
24322438
let mut info = None;
24332439
tracing::trace!("on_complete({})", hash.to_hex());
2434-
entry.transform(|state| {
2435-
tracing::trace!("on_complete transform {:?}", state);
2436-
let entry = match complete_storage(
2437-
state,
2438-
&hash,
2439-
&self.options.path,
2440-
&self.options.inline,
2441-
tables.delete_after_commit,
2442-
self.fs.clone(),
2443-
)? {
2444-
Ok(entry) => {
2445-
// store the info so we can insert it into the db later
2446-
info = Some((
2447-
entry.data_size(),
2448-
entry.data.mem().cloned(),
2449-
entry.outboard_size(),
2450-
entry.outboard.mem().cloned(),
2451-
));
2452-
entry
2453-
}
2454-
Err(entry) => {
2455-
// the entry was already complete, nothing to do
2456-
entry
2457-
}
2458-
};
2459-
Ok(BaoFileStorage::Complete(entry))
2460-
})?;
2440+
entry
2441+
.transform(async |state| {
2442+
tracing::trace!("on_complete transform {:?}", state);
2443+
let entry = match complete_storage(
2444+
state,
2445+
&hash,
2446+
&self.options.path,
2447+
&self.options.inline,
2448+
tables.delete_after_commit,
2449+
self.fs.clone(),
2450+
)
2451+
.await?
2452+
{
2453+
Ok(entry) => {
2454+
// store the info so we can insert it into the db later
2455+
info = Some((
2456+
entry.data_size(),
2457+
entry.data.mem().cloned(),
2458+
entry.outboard_size(),
2459+
entry.outboard.mem().cloned(),
2460+
));
2461+
entry
2462+
}
2463+
Err(entry) => {
2464+
// the entry was already complete, nothing to do
2465+
entry
2466+
}
2467+
};
2468+
Ok(BaoFileStorage::Complete(entry))
2469+
})
2470+
.await?;
24612471
if let Some((data_size, data, outboard_size, outboard)) = info {
24622472
let data_location = if data.is_some() {
24632473
DataLocation::Inline(())
@@ -2587,7 +2597,7 @@ where
25872597
) -> ActorResult<std::result::Result<(), ActorMessage<T::File>>> {
25882598
match msg {
25892599
ActorMessage::Import { cmd, tx } => {
2590-
let res = self.import(tables, cmd);
2600+
let res = self.import(tables, cmd).await;
25912601
tx.send(res).ok();
25922602
}
25932603
ActorMessage::SetTag { tag, value, tx } => {
@@ -2615,7 +2625,7 @@ where
26152625
tx.send(res).ok();
26162626
}
26172627
ActorMessage::OnComplete { handle } => {
2618-
let res = self.on_complete(tables, handle);
2628+
let res = self.on_complete(tables, handle).await;
26192629
res.ok();
26202630
}
26212631
ActorMessage::Export { cmd, tx } => {
@@ -2797,7 +2807,7 @@ async fn load_outboard<T: Persistence>(
27972807
}
27982808

27992809
/// Take a possibly incomplete storage and turn it into complete
2800-
fn complete_storage<T>(
2810+
async fn complete_storage<T>(
28012811
storage: BaoFileStorage<T::File>,
28022812
hash: &Hash,
28032813
path_options: &PathOptions,
@@ -2864,13 +2874,12 @@ where
28642874
)
28652875
}
28662876
MemOrFile::File(data) => MemOrFile::File(
2867-
block_for(
2868-
FileAndSize {
2869-
file: data,
2870-
size: data_size,
2871-
}
2872-
.map_async(move |f| fs_2.convert_std_file(f)),
2873-
)
2877+
FileAndSize {
2878+
file: data,
2879+
size: data_size,
2880+
}
2881+
.map_async(move |f| fs_2.convert_std_file(f))
2882+
.await
28742883
.transpose()
28752884
.map_err(|e| ActorError::Io(e.into()))?,
28762885
),

0 commit comments

Comments
 (0)