diff --git a/postgres-protocol/src/lib.rs b/postgres-protocol/src/lib.rs index f65ed84f3..93f02eaf1 100644 --- a/postgres-protocol/src/lib.rs +++ b/postgres-protocol/src/lib.rs @@ -67,7 +67,7 @@ macro_rules! from_usize { impl FromUsize for $t { #[inline] fn from_usize(x: usize) -> io::Result<$t> { - if x > <$t>::max_value() as usize { + if x > <$t>::MAX as usize { Err(io::Error::new( io::ErrorKind::InvalidInput, "value too large to transmit", diff --git a/postgres-types/src/lib.rs b/postgres-types/src/lib.rs index d45fe7456..a63ed0ddc 100644 --- a/postgres-types/src/lib.rs +++ b/postgres-types/src/lib.rs @@ -1222,7 +1222,7 @@ impl ToSql for IpAddr { } fn downcast(len: usize) -> Result> { - if len > i32::max_value() as usize { + if len > i32::MAX as usize { Err("value too large to transmit".into()) } else { Ok(len as i32) diff --git a/postgres-types/src/special.rs b/postgres-types/src/special.rs index 1a865287e..d8541bf0e 100644 --- a/postgres-types/src/special.rs +++ b/postgres-types/src/special.rs @@ -1,7 +1,6 @@ use bytes::BytesMut; use postgres_protocol::types; use std::error::Error; -use std::{i32, i64}; use crate::{FromSql, IsNull, ToSql, Type}; diff --git a/tokio-postgres/src/client.rs b/tokio-postgres/src/client.rs index c55a81574..5b5ab7686 100644 --- a/tokio-postgres/src/client.rs +++ b/tokio-postgres/src/client.rs @@ -1,4 +1,4 @@ -use crate::codec::{BackendMessages, FrontendMessage}; +use crate::codec::BackendMessages; use crate::config::SslMode; use crate::connection::{Request, RequestMessages}; use crate::copy_both::CopyBothDuplex; @@ -23,7 +23,7 @@ use fallible_iterator::FallibleIterator; use futures_channel::mpsc; use futures_util::{future, pin_mut, ready, StreamExt, TryStreamExt}; use parking_lot::Mutex; -use postgres_protocol::message::{backend::Message, frontend}; +use postgres_protocol::message::backend::Message; use postgres_types::BorrowToSql; use std::collections::HashMap; use std::fmt; @@ -493,43 +493,7 @@ impl Client { /// /// The transaction will roll back by default - use the `commit` method to commit it. pub async fn transaction(&mut self) -> Result, Error> { - struct RollbackIfNotDone<'me> { - client: &'me Client, - done: bool, - } - - impl<'a> Drop for RollbackIfNotDone<'a> { - fn drop(&mut self) { - if self.done { - return; - } - - let buf = self.client.inner().with_buf(|buf| { - frontend::query("ROLLBACK", buf).unwrap(); - buf.split().freeze() - }); - let _ = self - .client - .inner() - .send(RequestMessages::Single(FrontendMessage::Raw(buf))); - } - } - - // This is done, as `Future` created by this method can be dropped after - // `RequestMessages` is synchronously send to the `Connection` by - // `batch_execute()`, but before `Responses` is asynchronously polled to - // completion. In that case `Transaction` won't be created and thus - // won't be rolled back. - { - let mut cleaner = RollbackIfNotDone { - client: self, - done: false, - }; - self.batch_execute("BEGIN").await?; - cleaner.done = true; - } - - Ok(Transaction::new(self)) + self.build_transaction().start().await } /// Returns a builder for a transaction with custom settings. diff --git a/tokio-postgres/src/generic_client.rs b/tokio-postgres/src/generic_client.rs index 50cff9712..88ed2c8c4 100644 --- a/tokio-postgres/src/generic_client.rs +++ b/tokio-postgres/src/generic_client.rs @@ -66,10 +66,10 @@ pub trait GenericClient: private::Sealed { parameter_types: &[Type], ) -> Result; - /// Like `Client::transaction`. - async fn transaction(&mut self) -> Result, Error>; + /// Like [`Client::transaction`]. + async fn transaction<'a>(&'a mut self) -> Result, Error>; - /// Like `Client::batch_execute`. + /// Like [`Client::batch_execute`]. async fn batch_execute(&self, query: &str) -> Result<(), Error>; /// Returns a reference to the underlying `Client`. @@ -148,7 +148,7 @@ impl GenericClient for Client { self.prepare_typed(query, parameter_types).await } - async fn transaction(&mut self) -> Result, Error> { + async fn transaction<'a>(&'a mut self) -> Result, Error> { self.transaction().await } diff --git a/tokio-postgres/src/transaction_builder.rs b/tokio-postgres/src/transaction_builder.rs index 9718ac588..88c883176 100644 --- a/tokio-postgres/src/transaction_builder.rs +++ b/tokio-postgres/src/transaction_builder.rs @@ -1,4 +1,6 @@ -use crate::{Client, Error, Transaction}; +use postgres_protocol::message::frontend; + +use crate::{codec::FrontendMessage, connection::RequestMessages, Client, Error, Transaction}; /// The isolation level of a database transaction. #[derive(Debug, Copy, Clone)] @@ -106,7 +108,41 @@ impl<'a> TransactionBuilder<'a> { query.push_str(s); } - self.client.batch_execute(&query).await?; + struct RollbackIfNotDone<'me> { + client: &'me Client, + done: bool, + } + + impl Drop for RollbackIfNotDone<'_> { + fn drop(&mut self) { + if self.done { + return; + } + + let buf = self.client.inner().with_buf(|buf| { + frontend::query("ROLLBACK", buf).unwrap(); + buf.split().freeze() + }); + let _ = self + .client + .inner() + .send(RequestMessages::Single(FrontendMessage::Raw(buf))); + } + } + + // This is done as `Future` created by this method can be dropped after + // `RequestMessages` is synchronously send to the `Connection` by + // `batch_execute()`, but before `Responses` is asynchronously polled to + // completion. In that case `Transaction` won't be created and thus + // won't be rolled back. + { + let mut cleaner = RollbackIfNotDone { + client: self.client, + done: false, + }; + self.client.batch_execute(&query).await?; + cleaner.done = true; + } Ok(Transaction::new(self.client)) }