Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add usage of scoped futures to transactions: ownership of this change transferred to @weiznich #38

Merged
merged 9 commits into from
Nov 12, 2022
61 changes: 42 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mysql_common = {version = "0.29.0", optional = true}
bb8 = {version = "0.8", optional = true}
deadpool = {version = "0.9", optional = true}
mobc = {version = "0.7", optional = true}
scoped-futures = {version = "0.1", features = ["std"]}

[dev-dependencies]
tokio = {version = "1.12.0", features = ["rt", "macros", "rt-multi-thread"]}
Expand Down
18 changes: 10 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ use diesel::backend::Backend;
use diesel::query_builder::{AsQuery, QueryFragment, QueryId};
use diesel::row::Row;
use diesel::{ConnectionResult, QueryResult};
use futures::future::BoxFuture;
use futures::{Future, Stream};

pub use scoped_futures;
use scoped_futures::ScopedBoxFuture;

#[cfg(feature = "mysql")]
mod mysql;
#[cfg(feature = "postgres")]
Expand Down Expand Up @@ -180,7 +182,7 @@ where
/// ```rust
/// # include!("doctest_setup.rs");
/// use diesel::result::Error;
/// use futures::FutureExt;
/// use scoped_futures::ScopedFutureExt;
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() {
Expand All @@ -200,7 +202,7 @@ where
/// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
///
/// Ok(())
/// }.boxed()).await?;
/// }.scope_boxed()).await?;
///
/// conn.transaction::<(), _, _>(|conn| async move {
/// diesel::insert_into(users)
Expand All @@ -214,18 +216,18 @@ where
/// // If we want to roll back the transaction, but don't have an
/// // actual error to return, we can return `RollbackTransaction`.
/// Err(Error::RollbackTransaction)
/// }.boxed()).await;
/// }.scope_boxed()).await;
///
/// let all_names = users.select(name).load::<String>(conn).await?;
/// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
/// # Ok(())
/// # }
/// ```
async fn transaction<R, E, F>(&mut self, callback: F) -> Result<R, E>
async fn transaction<'a, R, E, F>(&mut self, callback: F) -> Result<R, E>
where
F: FnOnce(&mut Self) -> BoxFuture<Result<R, E>> + Send,
E: From<diesel::result::Error> + Send,
R: Send,
F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
E: From<diesel::result::Error> + Send + 'a,
R: Send + 'a,
{
Self::TransactionManager::transaction(self, callback).await
}
Expand Down
4 changes: 2 additions & 2 deletions src/pg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ impl AsyncPgConnection {
///
/// ```rust
/// # include!("../doctest_setup.rs");
/// # use futures::FutureExt;
/// # use scoped_futures::ScopedFutureExt;
/// #
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() {
Expand All @@ -285,7 +285,7 @@ impl AsyncPgConnection {
/// .read_only()
/// .serializable()
/// .deferrable()
/// .run(|conn| async move { Ok(()) }.boxed())
/// .run(|conn| async move { Ok(()) }.scope_boxed())
/// .await
/// # }
/// ```
Expand Down
9 changes: 5 additions & 4 deletions src/pg/transaction_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use diesel::backend::Backend;
use diesel::pg::Pg;
use diesel::query_builder::{AstPass, QueryBuilder, QueryFragment};
use diesel::QueryResult;
use futures::future::BoxFuture;
use scoped_futures::ScopedBoxFuture;

/// Used to build a transaction, specifying additional details.
///
Expand Down Expand Up @@ -283,10 +283,11 @@ where
/// the original error will be returned, otherwise the error generated by the rollback
/// will be returned. In the second case the connection should be considered broken
/// as it contains a uncommitted unabortable open transaction.
pub async fn run<T, E, F>(&mut self, f: F) -> Result<T, E>
pub async fn run<'b, T, E, F>(&mut self, f: F) -> Result<T, E>
where
F: FnOnce(&mut C) -> BoxFuture<Result<T, E>> + Send,
E: From<diesel::result::Error>,
F: for<'r> FnOnce(&'r mut C) -> ScopedBoxFuture<'b, 'r, Result<T, E>> + Send + 'a,
T: 'b,
E: From<diesel::result::Error> + 'b,
{
let mut query_builder = <Pg as Backend>::QueryBuilder::default();
self.to_sql(&mut query_builder, &Pg)?;
Expand Down
12 changes: 5 additions & 7 deletions src/run_query_dsl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub mod methods {
U: Send,
T: AsQuery + Send,
T::SqlType: CompatibleType<U, DB, SqlType = ST>,
U: FromSqlRow<ST, DB> + Send + 'static,
U: FromSqlRow<ST, DB> + Send,
DB: QueryMetadata<T::SqlType>,
{
type LoadFuture = futures::future::MapOk<
Expand All @@ -115,13 +115,12 @@ pub mod methods {
where
Conn: AsyncConnection<Backend = DB>,
U: Send,
DB: Backend + 'static,
DB: Backend,
T: AsQuery + Send + 'query,
T::Query: QueryFragment<DB> + QueryId + Send + 'query,
T::SqlType: CompatibleType<U, DB, SqlType = ST>,
U: FromSqlRow<ST, DB> + Send + 'static,
U: FromSqlRow<ST, DB> + Send,
DB: QueryMetadata<T::SqlType>,
ST: 'static,
{
fn internal_load<'conn>(
self,
Expand All @@ -142,9 +141,8 @@ pub mod methods {
where
S: Stream<Item = QueryResult<R>> + Send + 's,
R: diesel::row::Row<'a, DB> + 's,
DB: Backend + 'static,
U: FromSqlRow<ST, DB> + 'static,
ST: 'static,
DB: Backend,
U: FromSqlRow<ST, DB>,
{
stream.map(map_row_helper::<_, DB, U, ST>)
}
Expand Down
6 changes: 3 additions & 3 deletions src/transaction_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use diesel::result::Error;
use diesel::QueryResult;
use futures::future::BoxFuture;
use scoped_futures::ScopedBoxFuture;
use std::borrow::Cow;
use std::num::NonZeroU32;

Expand Down Expand Up @@ -50,9 +50,9 @@ pub trait TransactionManager<Conn: AsyncConnection>: Send {
///
/// Each implementation of this function needs to fulfill the documented
/// behaviour of [`AsyncConnection::transaction`]
async fn transaction<F, R, E>(conn: &mut Conn, callback: F) -> Result<R, E>
async fn transaction<'a, F, R, E>(conn: &mut Conn, callback: F) -> Result<R, E>
where
F: FnOnce(&mut Conn) -> BoxFuture<Result<R, E>> + Send,
F: for<'r> FnOnce(&'r mut Conn) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
E: From<Error> + Send,
R: Send,
{
Expand Down
4 changes: 2 additions & 2 deletions tests/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use diesel::prelude::{ExpressionMethods, OptionalExtension, QueryDsl};
use diesel::QueryResult;
use diesel_async::*;
use futures::FutureExt;
use scoped_futures::ScopedFutureExt;
use std::fmt::Debug;
use std::pin::Pin;

Expand Down Expand Up @@ -31,7 +31,7 @@ async fn transaction_test(conn: &mut TestConnection) -> QueryResult<()> {
assert_eq!(count, 3);
Ok(())
}
.boxed()
.scope_boxed()
})
.await;
assert!(res.is_ok());
Expand Down