Skip to content

Commit

Permalink
Use ready! rustc 1.64 macro (#3315)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill Bulatov authored Jan 12, 2023
1 parent bb406b2 commit fe8cef3
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 29 deletions.
32 changes: 13 additions & 19 deletions libs/utils/src/postgres_backend_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use crate::postgres_backend::AuthType;
use anyhow::Context;
use bytes::{Buf, Bytes, BytesMut};
use pq_proto::{BeMessage, ConnectionError, FeMessage, FeStartupPacket, SQLSTATE_INTERNAL_ERROR};
use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::{future::Future, task::ready};
use tracing::{debug, error, info, trace};

use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
Expand Down Expand Up @@ -253,12 +253,9 @@ impl PostgresBackend {
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
while self.buf_out.has_remaining() {
match Pin::new(&mut self.stream).poll_write(cx, self.buf_out.chunk()) {
Poll::Ready(Ok(bytes_written)) => {
self.buf_out.advance(bytes_written);
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending,
match ready!(Pin::new(&mut self.stream).poll_write(cx, self.buf_out.chunk())) {
Ok(bytes_written) => self.buf_out.advance(bytes_written),
Err(err) => return Poll::Ready(Err(err)),
}
}
Poll::Ready(Ok(()))
Expand Down Expand Up @@ -573,10 +570,9 @@ impl<'a> AsyncWrite for CopyDataWriter<'a> {
// It's not strictly required to flush between each message, but makes it easier
// to view in wireshark, and usually the messages that the callers write are
// decently-sized anyway.
match this.pgb.poll_write_buf(cx) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending,
match ready!(this.pgb.poll_write_buf(cx)) {
Ok(()) => {}
Err(err) => return Poll::Ready(Err(err)),
}

// CopyData
Expand All @@ -593,10 +589,9 @@ impl<'a> AsyncWrite for CopyDataWriter<'a> {
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
let this = self.get_mut();
match this.pgb.poll_write_buf(cx) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending,
match ready!(this.pgb.poll_write_buf(cx)) {
Ok(()) => {}
Err(err) => return Poll::Ready(Err(err)),
}
this.pgb.poll_flush(cx)
}
Expand All @@ -605,10 +600,9 @@ impl<'a> AsyncWrite for CopyDataWriter<'a> {
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
let this = self.get_mut();
match this.pgb.poll_write_buf(cx) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending,
match ready!(this.pgb.poll_write_buf(cx)) {
Ok(()) => {}
Err(err) => return Poll::Ready(Err(err)),
}
this.pgb.poll_flush(cx)
}
Expand Down
18 changes: 8 additions & 10 deletions proxy/src/http/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::convert::Infallible;
use std::future::ready;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};
use tls_listener::TlsListener;

use tokio::io::{self, AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
Expand Down Expand Up @@ -104,10 +104,9 @@ impl AsyncRead for WebSocketRW {
return Poll::Ready(Ok(()));
}

let inner_buf = match self.as_mut().poll_fill_buf(cx) {
Poll::Ready(Ok(buf)) => buf,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending,
let inner_buf = match ready!(self.as_mut().poll_fill_buf(cx)) {
Ok(buf) => buf,
Err(err) => return Poll::Ready(Err(err)),
};
let len = std::cmp::min(inner_buf.len(), buf.remaining());
buf.put_slice(&inner_buf[..len]);
Expand All @@ -124,8 +123,8 @@ impl AsyncBufRead for WebSocketRW {
let buf = self.project().chunk.as_ref().unwrap().chunk();
return Poll::Ready(Ok(buf));
} else {
match self.as_mut().project().stream.poll_next(cx) {
Poll::Ready(Some(Ok(message))) => match message {
match ready!(self.as_mut().project().stream.poll_next(cx)) {
Some(Ok(message)) => match message {
Message::Text(_) => {}
Message::Binary(chunk) => {
*self.as_mut().project().chunk = Some(Bytes::from(chunk));
Expand All @@ -142,9 +141,8 @@ impl AsyncBufRead for WebSocketRW {
unreachable!();
}
},
Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(ws_err_into(err))),
Poll::Ready(None) => return Poll::Ready(Ok(&[])),
Poll::Pending => return Poll::Pending,
Some(Err(err)) => return Poll::Ready(Err(ws_err_into(err))),
None => return Poll::Ready(Ok(&[])),
}
}
}
Expand Down

0 comments on commit fe8cef3

Please sign in to comment.