Skip to content
This repository has been archived by the owner on Nov 7, 2024. It is now read-only.

Add a helper to flatten Result<Result<T>> with tokio_util #210

Merged
merged 1 commit into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions lib/src/container/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ impl LayeredImageImporter {
// Destructure to transfer ownership to thread
let repo = self.repo;
let imgref = self.target_imgref.unwrap_or(self.imgref);
let state = crate::tokio_util::spawn_blocking_cancellable(
let state = crate::tokio_util::spawn_blocking_cancellable_flatten(
move |cancellable| -> Result<LayeredImageState> {
let cancellable = Some(cancellable);
let repo = &repo;
Expand Down Expand Up @@ -401,7 +401,7 @@ impl LayeredImageImporter {
Ok(state)
},
)
.await??;
.await?;
Ok(state)
}
}
Expand Down Expand Up @@ -481,7 +481,7 @@ pub async fn copy(
let ostree_ref = ostree_ref?;
let src_repo = src_repo.clone();
let dest_repo = dest_repo.clone();
crate::tokio_util::spawn_blocking_cancellable(move |cancellable| -> Result<_> {
crate::tokio_util::spawn_blocking_cancellable_flatten(move |cancellable| -> Result<_> {
let cancellable = Some(cancellable);
let srcfd = &format!("file:///proc/self/fd/{}", src_repo.dfd());
let flags = ostree::RepoPullFlags::MIRROR;
Expand All @@ -495,7 +495,7 @@ pub async fn copy(
dest_repo.pull_with_options(srcfd, &options, None, cancellable)?;
Ok(())
})
.await??;
.await?;
}
Ok(())
}
Expand Down
8 changes: 3 additions & 5 deletions lib/src/tar/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use anyhow::{anyhow, Context};
use camino::Utf8Path;
use camino::Utf8PathBuf;
use fn_error_context::context;
use futures_util::TryFutureExt;
use gio::glib;
use gio::prelude::*;
use glib::Variant;
Expand Down Expand Up @@ -599,17 +598,16 @@ pub async fn import_tar(
let options = options.unwrap_or_default();
let src = tokio_util::io::SyncIoBridge::new(src);
let repo = repo.clone();
let import = crate::tokio_util::spawn_blocking_cancellable(move |cancellable| {
let import = crate::tokio_util::spawn_blocking_cancellable_flatten(move |cancellable| {
let mut archive = tar::Archive::new(src);
let txn = repo.auto_transaction(Some(cancellable))?;
let importer = Importer::new(&repo, options.remote);
let checksum = importer.import(&mut archive, Some(cancellable))?;
txn.commit(Some(cancellable))?;
repo.mark_commit_partial(&checksum, false)?;
Ok::<_, anyhow::Error>(checksum)
})
.map_err(anyhow::Error::msg);
let import: String = import.await??;
});
let import: String = import.await?;
Ok(import)
}

Expand Down
25 changes: 24 additions & 1 deletion lib/src/tokio_util.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! Helpers for bridging GLib async/mainloop with Tokio.

use anyhow::Result;
use futures_util::Future;
use core::fmt::{Debug, Display};
use futures_util::{Future, FutureExt};
use ostree::gio;
use ostree::prelude::CancellableExt;

Expand Down Expand Up @@ -48,6 +49,28 @@ where
f(&dropper.0)
})
}

/// Flatten a nested Result<Result<T>>, defaulting to converting the error type to an `anyhow::Error`.
/// See https://doc.rust-lang.org/std/result/enum.Result.html#method.flatten
pub(crate) fn flatten_anyhow<T, E>(r: std::result::Result<Result<T>, E>) -> Result<T>
where
E: Display + Debug + Send + Sync + 'static,
{
match r {
Ok(x) => x,
Err(e) => Err(anyhow::anyhow!(e)),
}
}

/// A wrapper around [`spawn_blocking_cancellable`] that flattens nested results.
pub fn spawn_blocking_cancellable_flatten<F, T>(f: F) -> impl Future<Output = Result<T>>
where
F: FnOnce(&gio::Cancellable) -> Result<T> + Send + 'static,
T: Send + 'static,
{
spawn_blocking_cancellable(f).map(flatten_anyhow)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down