Skip to content
This repository was archived by the owner on Jan 14, 2025. It is now read-only.

Commit 78f5c02

Browse files
conradludgatepimeys
authored andcommitted
simple query ready for query (sfackler#22)
* add ready_status on simple queries * add correct socket2 features
1 parent 0d8c716 commit 78f5c02

File tree

10 files changed

+78
-25
lines changed

10 files changed

+78
-25
lines changed

postgres/src/client.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,9 @@ impl Client {
419419
/// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
420420
/// them to this method!
421421
pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> {
422-
self.connection.block_on(self.client.batch_execute(query))
422+
self.connection
423+
.block_on(self.client.batch_execute(query))
424+
.map(|_| ())
423425
}
424426

425427
/// Begins a new database transaction.

postgres/src/transaction.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ impl<'a> Transaction<'a> {
3535
pub fn commit(mut self) -> Result<(), Error> {
3636
self.connection
3737
.block_on(self.transaction.take().unwrap().commit())
38+
.map(|_| ())
3839
}
3940

4041
/// Rolls the transaction back, discarding all changes made within it.
@@ -43,6 +44,7 @@ impl<'a> Transaction<'a> {
4344
pub fn rollback(mut self) -> Result<(), Error> {
4445
self.connection
4546
.block_on(self.transaction.take().unwrap().rollback())
47+
.map(|_| ())
4648
}
4749

4850
/// Like `Client::prepare`.
@@ -193,6 +195,7 @@ impl<'a> Transaction<'a> {
193195
pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> {
194196
self.connection
195197
.block_on(self.transaction.as_ref().unwrap().batch_execute(query))
198+
.map(|_| ())
196199
}
197200

198201
/// Like `Client::cancel_token`.

tokio-postgres/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ postgres-protocol = { version = "0.6.6", path = "../postgres-protocol" }
5858
postgres-types = { version = "0.2.5", path = "../postgres-types" }
5959
tokio = { version = "1.27", features = ["io-util"] }
6060
tokio-util = { version = "0.7", features = ["codec"] }
61+
socket2 = { version = "0.5", features = ["all"] }
6162
rand = "0.8.5"
6263
whoami = "1.4.1"
6364

tokio-postgres/src/client.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ use crate::types::{Oid, ToSql, Type};
1515
use crate::Socket;
1616
use crate::{
1717
copy_both, copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken,
18-
CopyInSink, Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction,
19-
TransactionBuilder,
18+
CopyInSink, Error, ReadyForQueryStatus, Row, SimpleQueryMessage, Statement, ToStatement,
19+
Transaction, TransactionBuilder,
2020
};
2121
use bytes::{Buf, BytesMut};
2222
use fallible_iterator::FallibleIterator;
@@ -506,7 +506,7 @@ impl Client {
506506
/// Prepared statements should be use for any query which contains user-specified data, as they provided the
507507
/// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
508508
/// them to this method!
509-
pub async fn batch_execute(&self, query: &str) -> Result<(), Error> {
509+
pub async fn batch_execute(&self, query: &str) -> Result<ReadyForQueryStatus, Error> {
510510
simple_query::batch_execute(self.inner(), query).await
511511
}
512512

tokio-postgres/src/generic_client.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,8 @@ impl GenericClient for Client {
175175
}
176176

177177
async fn batch_execute(&self, query: &str) -> Result<(), Error> {
178-
self.batch_execute(query).await
178+
self.batch_execute(query).await?;
179+
Ok(())
179180
}
180181

181182
fn client(&self) -> &Client {
@@ -272,7 +273,8 @@ impl GenericClient for Transaction<'_> {
272273
}
273274

274275
async fn batch_execute(&self, query: &str) -> Result<(), Error> {
275-
self.batch_execute(query).await
276+
self.batch_execute(query).await?;
277+
Ok(())
276278
}
277279

278280
fn client(&self) -> &Client {

tokio-postgres/src/lib.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@
118118
//! | `with-time-0_3` | Enable support for the 0.3 version of the `time` crate. | [time](https://crates.io/crates/time/0.3.0) 0.3 | no |
119119
#![warn(rust_2018_idioms, clippy::all, missing_docs)]
120120

121+
use postgres_protocol::message::backend::ReadyForQueryBody;
122+
121123
pub use crate::cancel_token::CancelToken;
122124
pub use crate::client::Client;
123125
pub use crate::config::Config;
@@ -142,6 +144,31 @@ pub use crate::transaction::Transaction;
142144
pub use crate::transaction_builder::{IsolationLevel, TransactionBuilder};
143145
use crate::types::ToSql;
144146

147+
/// After executing a query, the connection will be in one of these states
148+
#[derive(Clone, Copy, Debug, PartialEq)]
149+
#[repr(u8)]
150+
pub enum ReadyForQueryStatus {
151+
/// Connection state is unknown
152+
Unknown,
153+
/// Connection is idle (no transactions)
154+
Idle = b'I',
155+
/// Connection is in a transaction block
156+
Transaction = b'T',
157+
/// Connection is in a failed transaction block
158+
FailedTransaction = b'E',
159+
}
160+
161+
impl From<ReadyForQueryBody> for ReadyForQueryStatus {
162+
fn from(value: ReadyForQueryBody) -> Self {
163+
match value.status() {
164+
b'I' => Self::Idle,
165+
b'T' => Self::Transaction,
166+
b'E' => Self::FailedTransaction,
167+
_ => Self::Unknown,
168+
}
169+
}
170+
}
171+
145172
pub mod binary_copy;
146173
mod bind;
147174
#[cfg(feature = "runtime")]

tokio-postgres/src/query.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::client::{InnerClient, Responses};
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
44
use crate::types::{BorrowToSql, IsNull};
5-
use crate::{Error, Portal, Row, Statement};
5+
use crate::{Error, Portal, ReadyForQueryStatus, Row, Statement};
66
use bytes::{BufMut, Bytes, BytesMut};
77
use futures_util::{ready, Stream};
88
use log::{debug, log_enabled, Level};
@@ -55,7 +55,7 @@ where
5555
statement,
5656
responses,
5757
command_tag: None,
58-
status: None,
58+
status: ReadyForQueryStatus::Unknown,
5959
output_format: Format::Binary,
6060
_p: PhantomPinned,
6161
})
@@ -109,7 +109,7 @@ where
109109
statement,
110110
responses,
111111
command_tag: None,
112-
status: None,
112+
status: ReadyForQueryStatus::Unknown,
113113
output_format: Format::Text,
114114
_p: PhantomPinned,
115115
})
@@ -132,7 +132,7 @@ pub async fn query_portal(
132132
statement: portal.statement().clone(),
133133
responses,
134134
command_tag: None,
135-
status: None,
135+
status: ReadyForQueryStatus::Unknown,
136136
output_format: Format::Binary,
137137
_p: PhantomPinned,
138138
})
@@ -269,7 +269,7 @@ pin_project! {
269269
responses: Responses,
270270
command_tag: Option<String>,
271271
output_format: Format,
272-
status: Option<u8>,
272+
status: ReadyForQueryStatus,
273273
#[pin]
274274
_p: PhantomPinned,
275275
}
@@ -296,7 +296,7 @@ impl Stream for RowStream {
296296
}
297297
}
298298
Message::ReadyForQuery(status) => {
299-
*this.status = Some(status.status());
299+
*this.status = status.into();
300300
return Poll::Ready(None);
301301
}
302302
_ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
@@ -316,7 +316,7 @@ impl RowStream {
316316
/// Returns if the connection is ready for querying, with the status of the connection.
317317
///
318318
/// This might be available only after the stream has been exhausted.
319-
pub fn ready_status(&self) -> Option<u8> {
319+
pub fn ready_status(&self) -> ReadyForQueryStatus {
320320
self.status
321321
}
322322
}

tokio-postgres/src/simple_query.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::client::{InnerClient, Responses};
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
44
use crate::query::extract_row_affected;
5-
use crate::{Error, SimpleQueryMessage, SimpleQueryRow};
5+
use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow};
66
use bytes::Bytes;
77
use fallible_iterator::FallibleIterator;
88
use futures_util::{ready, Stream};
@@ -41,19 +41,23 @@ pub async fn simple_query(client: &InnerClient, query: &str) -> Result<SimpleQue
4141
Ok(SimpleQueryStream {
4242
responses,
4343
columns: None,
44+
status: ReadyForQueryStatus::Unknown,
4445
_p: PhantomPinned,
4546
})
4647
}
4748

48-
pub async fn batch_execute(client: &InnerClient, query: &str) -> Result<(), Error> {
49+
pub async fn batch_execute(
50+
client: &InnerClient,
51+
query: &str,
52+
) -> Result<ReadyForQueryStatus, Error> {
4953
debug!("executing statement batch: {}", query);
5054

5155
let buf = encode(client, query)?;
5256
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
5357

5458
loop {
5559
match responses.next().await? {
56-
Message::ReadyForQuery(_) => return Ok(()),
60+
Message::ReadyForQuery(status) => return Ok(status.into()),
5761
Message::CommandComplete(_)
5862
| Message::EmptyQueryResponse
5963
| Message::RowDescription(_)
@@ -75,11 +79,21 @@ pin_project! {
7579
pub struct SimpleQueryStream {
7680
responses: Responses,
7781
columns: Option<Arc<[SimpleColumn]>>,
82+
status: ReadyForQueryStatus,
7883
#[pin]
7984
_p: PhantomPinned,
8085
}
8186
}
8287

88+
impl SimpleQueryStream {
89+
/// Returns if the connection is ready for querying, with the status of the connection.
90+
///
91+
/// This might be available only after the stream has been exhausted.
92+
pub fn ready_status(&self) -> ReadyForQueryStatus {
93+
self.status
94+
}
95+
}
96+
8397
impl Stream for SimpleQueryStream {
8498
type Item = Result<SimpleQueryMessage, Error>;
8599

@@ -111,7 +125,10 @@ impl Stream for SimpleQueryStream {
111125
};
112126
return Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row))));
113127
}
114-
Message::ReadyForQuery(_) => return Poll::Ready(None),
128+
Message::ReadyForQuery(s) => {
129+
*this.status = s.into();
130+
return Poll::Ready(None);
131+
}
115132
_ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
116133
}
117134
}

tokio-postgres/src/transaction.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ use crate::types::{BorrowToSql, ToSql, Type};
99
#[cfg(feature = "runtime")]
1010
use crate::Socket;
1111
use crate::{
12-
bind, query, slice_iter, CancelToken, Client, CopyInSink, Error, Portal, Row,
13-
SimpleQueryMessage, Statement, ToStatement,
12+
bind, query, slice_iter, CancelToken, Client, CopyInSink, Error, Portal, ReadyForQueryStatus,
13+
Row, SimpleQueryMessage, Statement, ToStatement,
1414
};
1515
use bytes::Buf;
1616
use futures_util::TryStreamExt;
@@ -65,7 +65,7 @@ impl<'a> Transaction<'a> {
6565
}
6666

6767
/// Consumes the transaction, committing all changes made within it.
68-
pub async fn commit(mut self) -> Result<(), Error> {
68+
pub async fn commit(mut self) -> Result<ReadyForQueryStatus, Error> {
6969
self.done = true;
7070
let query = if let Some(sp) = self.savepoint.as_ref() {
7171
format!("RELEASE {}", sp.name)
@@ -78,7 +78,7 @@ impl<'a> Transaction<'a> {
7878
/// Rolls the transaction back, discarding all changes made within it.
7979
///
8080
/// This is equivalent to `Transaction`'s `Drop` implementation, but provides any error encountered to the caller.
81-
pub async fn rollback(mut self) -> Result<(), Error> {
81+
pub async fn rollback(mut self) -> Result<ReadyForQueryStatus, Error> {
8282
self.done = true;
8383
let query = if let Some(sp) = self.savepoint.as_ref() {
8484
format!("ROLLBACK TO {}", sp.name)
@@ -261,7 +261,7 @@ impl<'a> Transaction<'a> {
261261
}
262262

263263
/// Like `Client::batch_execute`.
264-
pub async fn batch_execute(&self, query: &str) -> Result<(), Error> {
264+
pub async fn batch_execute(&self, query: &str) -> Result<ReadyForQueryStatus, Error> {
265265
self.client.batch_execute(query).await
266266
}
267267

tokio-postgres/tests/test/main.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ use tokio_postgres::error::SqlState;
1616
use tokio_postgres::tls::{NoTls, NoTlsStream};
1717
use tokio_postgres::types::{Kind, Type};
1818
use tokio_postgres::{
19-
AsyncMessage, Client, Config, Connection, Error, IsolationLevel, SimpleQueryMessage,
19+
AsyncMessage, Client, Config, Connection, Error, IsolationLevel, ReadyForQueryStatus,
20+
SimpleQueryMessage,
2021
};
2122

2223
mod binary_copy;
@@ -365,7 +366,7 @@ async fn ready_for_query() {
365366
pin_mut!(row_stream);
366367
while row_stream.next().await.is_none() {}
367368

368-
assert_eq!(row_stream.ready_status(), Some(b'T'));
369+
assert_eq!(row_stream.ready_status(), ReadyForQueryStatus::Transaction);
369370

370371
let row_stream = client
371372
.query_raw_txt("ROLLBACK", [] as [Option<&str>; 0])
@@ -375,7 +376,7 @@ async fn ready_for_query() {
375376
pin_mut!(row_stream);
376377
while row_stream.next().await.is_none() {}
377378

378-
assert_eq!(row_stream.ready_status(), Some(b'I'));
379+
assert_eq!(row_stream.ready_status(), ReadyForQueryStatus::Idle);
379380
}
380381

381382
#[tokio::test]

0 commit comments

Comments
 (0)