diff --git a/shotover-proxy/src/server.rs b/shotover-proxy/src/server.rs index 6bf9547ab..6a774c050 100644 --- a/shotover-proxy/src/server.rs +++ b/shotover-proxy/src/server.rs @@ -3,7 +3,7 @@ use crate::tls::TlsAcceptor; use crate::transforms::chain::TransformChain; use crate::transforms::Wrapper; use anyhow::{anyhow, Context, Result}; -use futures::SinkExt; +use futures::{SinkExt, StreamExt}; use metrics::{register_gauge, Gauge}; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite}; @@ -13,7 +13,6 @@ use tokio::sync::{mpsc, watch, Semaphore}; use tokio::time; use tokio::time::timeout; use tokio::time::Duration; -use tokio_stream::StreamExt as TokioStreamExt; use tokio_util::codec::{Decoder, Encoder}; use tokio_util::codec::{FramedRead, FramedWrite}; use tracing::Instrument; @@ -396,7 +395,7 @@ fn spawn_read_write_tasks< async move { loop { tokio::select! { - result = TokioStreamExt::next(&mut reader) => { + result = reader.next() => { if let Some(message) = result { match message { Ok(message) => { diff --git a/shotover-proxy/src/transforms/cassandra/connection.rs b/shotover-proxy/src/transforms/cassandra/connection.rs index f445456bd..35f19688d 100644 --- a/shotover-proxy/src/transforms/cassandra/connection.rs +++ b/shotover-proxy/src/transforms/cassandra/connection.rs @@ -18,7 +18,6 @@ use tokio::net::{TcpStream, ToSocketAddrs}; use tokio::sync::{mpsc, oneshot}; use tokio::time::timeout; use tokio_stream::wrappers::UnboundedReceiverStream; -use tokio_stream::StreamExt as TokioStreamExt; use tokio_util::codec::{FramedRead, FramedWrite}; use tracing::{info, Instrument}; @@ -90,7 +89,7 @@ async fn tx_process( codec: C, ) -> Result<()> { let in_w = FramedWrite::new(write, codec); - let rx_stream = TokioStreamExt::map(UnboundedReceiverStream::new(out_rx), |x| { + let rx_stream = UnboundedReceiverStream::new(out_rx).map(|x| { let ret = Ok(vec![x.message.clone()]); return_tx.send(x)?; ret @@ -112,7 +111,7 @@ async fn rx_process( loop { tokio::select! { - Some(maybe_req) = TokioStreamExt::next(&mut in_r) => { + Some(maybe_req) = in_r.next() => { match maybe_req { Ok(req) => { for m in req { @@ -196,7 +195,7 @@ pub async fn receive_message( failed_requests: &metrics::Counter, results: &mut FuturesOrdered>, ) -> Result { - match tokio_stream::StreamExt::next(results).await { + match results.next().await { Some(result) => match result? { Response { response: Ok(message), diff --git a/shotover-proxy/src/transforms/util/cluster_connection_pool.rs b/shotover-proxy/src/transforms/util/cluster_connection_pool.rs index ba58001aa..ec3246ff0 100644 --- a/shotover-proxy/src/transforms/util/cluster_connection_pool.rs +++ b/shotover-proxy/src/transforms/util/cluster_connection_pool.rs @@ -18,7 +18,6 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::Mutex; use tokio::time::timeout; use tokio_stream::wrappers::UnboundedReceiverStream; -use tokio_stream::StreamExt as TokioStreamExt; use tokio_util::codec::{FramedRead, FramedWrite}; use tracing::{debug, trace, warn, Instrument}; @@ -246,7 +245,7 @@ async fn tx_process( codec: C, ) -> Result<()> { let in_w = FramedWrite::new(write, codec); - let rx_stream = TokioStreamExt::map(UnboundedReceiverStream::new(out_rx), |x| { + let rx_stream = UnboundedReceiverStream::new(out_rx).map(|x| { let ret = Ok(vec![x.message.clone()]); return_tx.send(x)?; ret @@ -261,7 +260,7 @@ async fn rx_process( ) -> Result<()> { let mut in_r = FramedRead::new(read, codec); - while let Some(maybe_req) = TokioStreamExt::next(&mut in_r).await { + while let Some(maybe_req) = in_r.next().await { match maybe_req { Ok(req) => { for m in req { diff --git a/shotover-proxy/tests/codec/cassandra.rs b/shotover-proxy/tests/codec/cassandra.rs index c024d0483..f9fdcf88f 100644 --- a/shotover-proxy/tests/codec/cassandra.rs +++ b/shotover-proxy/tests/codec/cassandra.rs @@ -1,5 +1,5 @@ use bytes::Bytes; -use futures::sink::SinkExt; +use futures::SinkExt; use serial_test::serial; use shotover_proxy::codec::cassandra::CassandraCodec; use tokio::io::BufWriter;