diff --git a/shotover-proxy/benches/benches/codec.rs b/shotover-proxy/benches/benches/codec.rs index 738260f14..ab980a386 100644 --- a/shotover-proxy/benches/benches/codec.rs +++ b/shotover-proxy/benches/benches/codec.rs @@ -4,7 +4,8 @@ use cassandra_protocol::frame::message_result::{ }; use cassandra_protocol::frame::Version; use criterion::{black_box, criterion_group, BatchSize, Criterion}; -use shotover_proxy::codec::cassandra::CassandraCodec; +use shotover_proxy::codec::cassandra::CassandraCodecBuilder; +use shotover_proxy::codec::CodecBuilder; use shotover_proxy::frame::cassandra::{parse_statement_single, Tracing}; use shotover_proxy::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame}; use shotover_proxy::message::Message; @@ -27,14 +28,14 @@ fn criterion_benchmark(c: &mut Criterion) { }, }))]; - let mut codec = CassandraCodec::new(); + let (_, mut encoder) = CassandraCodecBuilder::new().build(); group.bench_function("encode_cassandra_system.local_query", |b| { b.iter_batched( || messages.clone(), |messages| { let mut bytes = BytesMut::new(); - codec.encode(messages, &mut bytes).unwrap(); + encoder.encode(messages, &mut bytes).unwrap(); black_box(bytes) }, BatchSize::SmallInput, @@ -51,14 +52,14 @@ fn criterion_benchmark(c: &mut Criterion) { operation: CassandraOperation::Result(peers_v2_result()), }))]; - let mut codec = CassandraCodec::new(); + let (_, mut encoder) = CassandraCodecBuilder::new().build(); group.bench_function("encode_cassandra_system.local_result", |b| { b.iter_batched( || messages.clone(), |messages| { let mut bytes = BytesMut::new(); - codec.encode(messages, &mut bytes).unwrap(); + encoder.encode(messages, &mut bytes).unwrap(); black_box(bytes) }, BatchSize::SmallInput, diff --git a/shotover-proxy/src/codec/cassandra.rs b/shotover-proxy/src/codec/cassandra.rs index f0a918f82..fd58f9da5 100644 --- a/shotover-proxy/src/codec/cassandra.rs +++ b/shotover-proxy/src/codec/cassandra.rs @@ -1,4 +1,4 @@ -use crate::codec::{Codec, CodecReadError}; +use crate::codec::{CodecBuilder, CodecReadError}; use crate::frame::cassandra::{CassandraMetadata, CassandraOperation, Tracing}; use crate::frame::{CassandraFrame, Frame, MessageType}; use crate::message::{Encodable, Message, Messages, Metadata}; @@ -17,40 +17,44 @@ use std::sync::RwLock; use tokio_util::codec::{Decoder, Encoder}; use tracing::info; -#[derive(Debug, Clone)] -pub struct CassandraCodec { - compression: Arc>, - messages: Vec, - current_use_keyspace: Option, -} +#[derive(Clone, Default)] +pub struct CassandraCodecBuilder {} -impl Default for CassandraCodec { - fn default() -> Self { - CassandraCodec::new() +impl CassandraCodecBuilder { + pub fn new() -> Self { + Self::default() } } -impl CassandraCodec { - pub fn new() -> CassandraCodec { - CassandraCodec { - compression: Arc::new(RwLock::new(Compression::None)), - messages: vec![], - current_use_keyspace: None, - } +impl CodecBuilder for CassandraCodecBuilder { + type Decoder = CassandraDecoder; + type Encoder = CassandraEncoder; + fn build(&self) -> (CassandraDecoder, CassandraEncoder) { + let compression = Arc::new(RwLock::new(Compression::None)); + ( + CassandraDecoder::new(compression.clone()), + CassandraEncoder::new(compression), + ) } } -impl Codec for CassandraCodec { - fn clone_without_state(&self) -> Self { - Self { - compression: Arc::new(RwLock::new(Compression::None)), - messages: self.messages.clone(), - current_use_keyspace: self.current_use_keyspace.clone(), +pub struct CassandraDecoder { + compression: Arc>, + messages: Vec, + current_use_keyspace: Option, +} + +impl CassandraDecoder { + pub fn new(compression: Arc>) -> CassandraDecoder { + CassandraDecoder { + compression, + messages: vec![], + current_use_keyspace: None, } } } -impl CassandraCodec { +impl CassandraDecoder { fn check_compression(&mut self, bytes: &BytesMut) -> Result { if bytes.len() < 9 { return Err(anyhow!("Not enough bytes for cassandra frame")); @@ -66,28 +70,28 @@ impl CassandraCodec { .. } = CassandraFrame::from_bytes(bytes.clone().freeze(), Compression::None)? { - self.set_compression(&startup); + set_compression(&mut self.compression, &startup); }; } Ok(compressed) } +} - fn set_compression(&mut self, startup: &BodyReqStartup) { - if let Some(compression) = startup.map.get("COMPRESSION") { - let mut write = self.compression.as_ref().write().unwrap(); +fn set_compression(compression_state: &mut Arc>, startup: &BodyReqStartup) { + if let Some(compression) = startup.map.get("COMPRESSION") { + let mut write = compression_state.write().unwrap(); - *write = match compression.as_str() { - "snappy" | "SNAPPY" => Compression::Snappy, - "lz4" | "LZ4" => Compression::Lz4, - "" | "none" | "NONE" => Compression::None, - _ => panic!(), - }; - } + *write = match compression.as_str() { + "snappy" | "SNAPPY" => Compression::Snappy, + "lz4" | "LZ4" => Compression::Lz4, + "" | "none" | "NONE" => Compression::None, + _ => panic!(), + }; } } -impl Decoder for CassandraCodec { +impl Decoder for CassandraDecoder { type Item = Messages; type Error = CodecReadError; @@ -247,7 +251,17 @@ fn reject_protocol_version(version: u8) -> CodecReadError { ))]) } -impl Encoder for CassandraCodec { +pub struct CassandraEncoder { + compression: Arc>, +} + +impl CassandraEncoder { + pub fn new(compression: Arc>) -> CassandraEncoder { + CassandraEncoder { compression } + } +} + +impl Encoder for CassandraEncoder { type Error = anyhow::Error; fn encode( @@ -271,7 +285,7 @@ impl Encoder for CassandraCodec { .. } = CassandraFrame::from_bytes(bytes.clone(), Compression::None)? { - self.set_compression(&startup); + set_compression(&mut self.compression, &startup); }; } } @@ -285,7 +299,7 @@ impl Encoder for CassandraCodec { .. }) = &frame { - self.set_compression(startup); + set_compression(&mut self.compression, startup); }; let buffer = frame.into_cassandra().unwrap().encode(compression); @@ -304,7 +318,8 @@ impl Encoder for CassandraCodec { #[cfg(test)] mod cassandra_protocol_tests { - use crate::codec::cassandra::CassandraCodec; + use crate::codec::cassandra::CassandraCodecBuilder; + use crate::codec::CodecBuilder; use crate::frame::cassandra::{ parse_statement_single, CassandraFrame, CassandraOperation, CassandraResult, Tracing, }; @@ -324,12 +339,13 @@ mod cassandra_protocol_tests { use tokio_util::codec::{Decoder, Encoder}; fn test_frame_codec_roundtrip( - codec: &mut CassandraCodec, + codec: &mut CassandraCodecBuilder, raw_frame: &[u8], expected_messages: Vec, ) { + let (mut decoder, mut encoder) = codec.build(); // test decode - let decoded_messages = codec + let decoded_messages = decoder .decode(&mut BytesMut::from(raw_frame)) .unwrap() .unwrap(); @@ -346,21 +362,21 @@ mod cassandra_protocol_tests { // test encode round trip - parsed messages { let mut dest = BytesMut::new(); - codec.encode(parsed_messages, &mut dest).unwrap(); + encoder.encode(parsed_messages, &mut dest).unwrap(); assert_eq!(raw_frame, &dest.to_vec()); } // test encode round trip - raw messages { let mut dest = BytesMut::new(); - codec.encode(decoded_messages, &mut dest).unwrap(); + encoder.encode(decoded_messages, &mut dest).unwrap(); assert_eq!(raw_frame, &dest.to_vec()); } } #[test] fn test_codec_startup() { - let mut codec = CassandraCodec::new(); + let mut codec = CassandraCodecBuilder::new(); let mut startup_body: HashMap = HashMap::new(); startup_body.insert("CQL_VERSION".into(), "3.0.0".into()); let bytes = hex!("0400000001000000160001000b43514c5f56455253494f4e0005332e302e30"); @@ -376,7 +392,7 @@ mod cassandra_protocol_tests { #[test] fn test_codec_options() { - let mut codec = CassandraCodec::new(); + let mut codec = CassandraCodecBuilder::new(); let bytes = hex!("040000000500000000"); let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { version: Version::V4, @@ -390,7 +406,7 @@ mod cassandra_protocol_tests { #[test] fn test_codec_ready() { - let mut codec = CassandraCodec::new(); + let mut codec = CassandraCodecBuilder::new(); let bytes = hex!("840000000200000000"); let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { version: Version::V4, @@ -404,7 +420,7 @@ mod cassandra_protocol_tests { #[test] fn test_codec_register() { - let mut codec = CassandraCodec::new(); + let mut codec = CassandraCodecBuilder::new(); let bytes = hex!( "040000010b000000310003000f544f504f4c4f47595f4348414e4745 000d5354415455535f4348414e4745000d534348454d415f4348414e4745" @@ -427,7 +443,7 @@ mod cassandra_protocol_tests { #[test] fn test_codec_result() { - let mut codec = CassandraCodec::new(); + let mut codec = CassandraCodecBuilder::new(); let bytes = hex!( "840000020800000099000000020000000100000009000673797374656 d000570656572730004706565720010000b646174615f63656e746572000d0007686f73745f6964000c000c70726566 @@ -535,7 +551,7 @@ mod cassandra_protocol_tests { #[test] fn test_codec_query_select() { - let mut codec = CassandraCodec::new(); + let mut codec = CassandraCodecBuilder::new(); let bytes = hex!( "0400000307000000350000002e53454c454354202a2046524f4d20737973 74656d2e6c6f63616c205748455245206b6579203d20276c6f63616c27000100" @@ -558,7 +574,7 @@ mod cassandra_protocol_tests { #[test] fn test_codec_query_insert() { - let mut codec = CassandraCodec::new(); + let mut codec = CassandraCodecBuilder::new(); let bytes = hex!( "0400000307000000330000002c494e5345525420494e544f207379737465 6d2e666f6f2028626172292056414c554553202827626172322729000100" diff --git a/shotover-proxy/src/codec/kafka.rs b/shotover-proxy/src/codec/kafka.rs index 4f148a594..e1e3b1fbc 100644 --- a/shotover-proxy/src/codec/kafka.rs +++ b/shotover-proxy/src/codec/kafka.rs @@ -1,10 +1,18 @@ -use crate::codec::{Codec, CodecReadError}; +use crate::codec::{CodecBuilder, CodecReadError}; use crate::frame::MessageType; use crate::message::{Encodable, Message, Messages, ProtocolType}; use anyhow::Result; use bytes::{Buf, BytesMut}; use tokio_util::codec::{Decoder, Encoder}; +impl CodecBuilder for KafkaCodec { + type Decoder = KafkaCodec; + type Encoder = KafkaCodec; + fn build(&self) -> (KafkaCodec, KafkaCodec) { + (KafkaCodec::new(), KafkaCodec::new()) + } +} + #[derive(Debug, Clone)] pub struct KafkaCodec { messages: Messages, @@ -16,8 +24,6 @@ impl Default for KafkaCodec { } } -impl Codec for KafkaCodec {} - impl KafkaCodec { pub fn new() -> KafkaCodec { KafkaCodec { messages: vec![] } diff --git a/shotover-proxy/src/codec/mod.rs b/shotover-proxy/src/codec/mod.rs index 6096a114a..827d22d9a 100644 --- a/shotover-proxy/src/codec/mod.rs +++ b/shotover-proxy/src/codec/mod.rs @@ -41,16 +41,15 @@ impl From for CodecReadError { } // TODO: Replace with trait_alias (rust-lang/rust#41517). -pub trait CodecReadHalf: Decoder + Clone + Send {} -impl + Clone + Send> CodecReadHalf for T {} +pub trait DecoderHalf: Decoder + Send {} +impl + Send> DecoderHalf for T {} // TODO: Replace with trait_alias (rust-lang/rust#41517). -pub trait CodecWriteHalf: Encoder + Clone + Send {} -impl + Clone + Send> CodecWriteHalf for T {} +pub trait EncoderHalf: Encoder + Send {} +impl + Send> EncoderHalf for T {} -// TODO: Replace with trait_alias (rust-lang/rust#41517). -pub trait Codec: CodecReadHalf + CodecWriteHalf + Sized + Clone { - fn clone_without_state(&self) -> Self { - self.clone() - } +pub trait CodecBuilder: Clone + Send { + type Decoder: DecoderHalf; + type Encoder: EncoderHalf; + fn build(&self) -> (Self::Decoder, Self::Encoder); } diff --git a/shotover-proxy/src/codec/redis.rs b/shotover-proxy/src/codec/redis.rs index af99d1494..c80ee0735 100644 --- a/shotover-proxy/src/codec/redis.rs +++ b/shotover-proxy/src/codec/redis.rs @@ -1,4 +1,4 @@ -use crate::codec::{Codec, CodecReadError}; +use crate::codec::{CodecBuilder, CodecReadError}; use crate::frame::RedisFrame; use crate::frame::{Frame, MessageType}; use crate::message::{Encodable, Message, Messages, QueryType}; @@ -8,6 +8,14 @@ use redis_protocol::resp2::prelude::decode_mut; use redis_protocol::resp2::prelude::encode_bytes; use tokio_util::codec::{Decoder, Encoder}; +impl CodecBuilder for RedisCodec { + type Decoder = RedisCodec; + type Encoder = RedisCodec; + fn build(&self) -> (RedisCodec, RedisCodec) { + (RedisCodec::new(), RedisCodec::new()) + } +} + #[derive(Debug, Clone)] pub struct RedisCodec { messages: Messages, @@ -36,8 +44,6 @@ impl Default for RedisCodec { } } -impl Codec for RedisCodec {} - impl RedisCodec { pub fn new() -> RedisCodec { RedisCodec { messages: vec![] } diff --git a/shotover-proxy/src/server.rs b/shotover-proxy/src/server.rs index 61074b7a9..09a17a30c 100644 --- a/shotover-proxy/src/server.rs +++ b/shotover-proxy/src/server.rs @@ -1,4 +1,4 @@ -use crate::codec::{Codec, CodecReadError}; +use crate::codec::{CodecBuilder, CodecReadError}; use crate::message::Messages; use crate::tls::{AcceptError, TlsAcceptor}; use crate::transforms::chain::{TransformChain, TransformChainBuilder}; @@ -21,7 +21,7 @@ use tokio_util::codec::{FramedRead, FramedWrite}; use tracing::Instrument; use tracing::{debug, error, info, warn}; -pub struct TcpCodecListener { +pub struct TcpCodecListener { chain: TransformChainBuilder, source_name: String, @@ -65,7 +65,7 @@ pub struct TcpCodecListener { connection_handles: Vec>, } -impl TcpCodecListener { +impl TcpCodecListener { #![allow(clippy::too_many_arguments)] pub async fn new( chain: TransformChainBuilder, @@ -195,7 +195,7 @@ impl TcpCodecListener { // The connection state needs a handle to the max connections // semaphore. When the handler is done processing the // connection, a permit is added back to the semaphore. - codec: self.codec.clone_without_state(), + codec: self.codec.clone(), limit_connections: self.limit_connections.clone(), // Receive shutdown notifications. @@ -276,7 +276,7 @@ async fn create_listener(listen_addr: &str) -> Result { .map_err(|e| anyhow!("{} address={}", e, listen_addr)) } -pub struct Handler { +pub struct Handler { chain: TransformChain, client_details: String, conn_details: String, @@ -310,7 +310,7 @@ pub struct Handler { } fn spawn_read_write_tasks< - C: Codec + 'static, + C: CodecBuilder + 'static, R: AsyncRead + Unpin + Send + 'static, W: AsyncWrite + Unpin + Send + 'static, >( @@ -322,8 +322,9 @@ fn spawn_read_write_tasks< out_tx: UnboundedSender, mut terminate_tasks_rx: watch::Receiver<()>, ) { - let mut reader = FramedRead::new(rx, codec.clone()); - let mut writer = FramedWrite::new(tx, codec); + let (decoder, encoder) = codec.build(); + let mut reader = FramedRead::new(rx, decoder); + let mut writer = FramedWrite::new(tx, encoder); // Shutdown flows // @@ -407,7 +408,7 @@ fn spawn_read_write_tasks< ); } -impl Handler { +impl Handler { /// Process a single connection. /// /// Request frames are read from the socket and processed. Responses are @@ -564,7 +565,7 @@ impl Handler { } } -impl Drop for Handler { +impl Drop for Handler { fn drop(&mut self) { // Add a permit back to the semaphore. // diff --git a/shotover-proxy/src/sources/cassandra_source.rs b/shotover-proxy/src/sources/cassandra_source.rs index c69eda85d..ca8c3a88c 100644 --- a/shotover-proxy/src/sources/cassandra_source.rs +++ b/shotover-proxy/src/sources/cassandra_source.rs @@ -1,4 +1,4 @@ -use crate::codec::cassandra::CassandraCodec; +use crate::codec::cassandra::CassandraCodecBuilder; use crate::server::TcpCodecListener; use crate::sources::Sources; use crate::tls::{TlsAcceptor, TlsAcceptorConfig}; @@ -67,7 +67,7 @@ impl CassandraSource { name.to_string(), listen_addr.clone(), hard_connection_limit.unwrap_or(false), - CassandraCodec::new(), + CassandraCodecBuilder::new(), Arc::new(Semaphore::new(connection_limit.unwrap_or(512))), trigger_shutdown_rx.clone(), tls.map(TlsAcceptor::new).transpose()?, diff --git a/shotover-proxy/src/transforms/cassandra/connection.rs b/shotover-proxy/src/transforms/cassandra/connection.rs index e2c0ac281..974fe4554 100644 --- a/shotover-proxy/src/transforms/cassandra/connection.rs +++ b/shotover-proxy/src/transforms/cassandra/connection.rs @@ -1,5 +1,5 @@ -use crate::codec::cassandra::CassandraCodec; -use crate::codec::CodecReadError; +use crate::codec::cassandra::{CassandraCodecBuilder, CassandraDecoder, CassandraEncoder}; +use crate::codec::{CodecBuilder, CodecReadError}; use crate::frame::cassandra::CassandraMetadata; use crate::frame::{CassandraFrame, Frame}; use crate::message::{Message, Metadata}; @@ -67,7 +67,7 @@ impl CassandraConnection { pub async fn new( connect_timeout: Duration, host: A, - codec: CassandraCodec, + codec: CassandraCodecBuilder, mut tls: Option, pushed_messages_tx: Option>, ) -> Result { @@ -77,6 +77,7 @@ impl CassandraConnection { let destination = tokio::net::lookup_host(&host).await?.next().unwrap(); + let (decoder, encoder) = codec.build(); if let Some(tls) = tls.as_mut() { let tls_stream = tls.connect(connect_timeout, host).await?; let (read, write) = split(tls_stream); @@ -85,7 +86,7 @@ impl CassandraConnection { write, out_rx, return_tx, - codec.clone(), + encoder, rx_process_has_shutdown_rx, destination, ) @@ -95,7 +96,7 @@ impl CassandraConnection { rx_process( read, return_rx, - codec.clone(), + decoder, pushed_messages_tx, rx_process_has_shutdown_tx, destination, @@ -110,7 +111,7 @@ impl CassandraConnection { write, out_rx, return_tx, - codec.clone(), + encoder, rx_process_has_shutdown_rx, destination, ) @@ -120,7 +121,7 @@ impl CassandraConnection { rx_process( read, return_rx, - codec.clone(), + decoder, pushed_messages_tx, rx_process_has_shutdown_tx, destination, @@ -169,7 +170,7 @@ async fn tx_process( write: WriteHalf, mut out_rx: mpsc::UnboundedReceiver, return_tx: mpsc::UnboundedSender, - codec: CassandraCodec, + codec: CassandraEncoder, mut rx_process_has_shutdown_rx: oneshot::Receiver, // Only used for error reporting destination: SocketAddr, @@ -245,7 +246,7 @@ fn send_error_to_request( async fn rx_process( read: ReadHalf, mut return_rx: mpsc::UnboundedReceiver, - codec: CassandraCodec, + codec: CassandraDecoder, pushed_messages_tx: Option>, rx_process_has_shutdown_tx: oneshot::Sender, // Only used for error reporting diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs index c95f4b8e3..305594604 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs @@ -1,4 +1,4 @@ -use crate::codec::cassandra::CassandraCodec; +use crate::codec::cassandra::CassandraCodecBuilder; use crate::frame::Frame; use crate::message::{Message, Messages}; use crate::tls::{TlsConnector, ToHostname}; @@ -107,7 +107,7 @@ impl ConnectionFactory { let outbound = CassandraConnection::new( self.connect_timeout, address, - CassandraCodec::new(), + CassandraCodecBuilder::new(), self.tls.clone(), self.pushed_messages_tx.clone(), ) diff --git a/shotover-proxy/src/transforms/cassandra/sink_single.rs b/shotover-proxy/src/transforms/cassandra/sink_single.rs index 68b681b72..5b2cd5e66 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_single.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_single.rs @@ -1,5 +1,5 @@ use super::connection::CassandraConnection; -use crate::codec::cassandra::CassandraCodec; +use crate::codec::cassandra::CassandraCodecBuilder; use crate::error::ChainResponse; use crate::frame::cassandra::CassandraMetadata; use crate::message::{Messages, Metadata}; @@ -131,7 +131,7 @@ impl CassandraSinkSingle { CassandraConnection::new( self.connect_timeout, self.address.clone(), - CassandraCodec::new(), + CassandraCodecBuilder::new(), self.tls.clone(), self.pushed_messages_tx.clone(), ) diff --git a/shotover-proxy/src/transforms/util/cluster_connection_pool.rs b/shotover-proxy/src/transforms/util/cluster_connection_pool.rs index d077cddc5..5a4346e4d 100644 --- a/shotover-proxy/src/transforms/util/cluster_connection_pool.rs +++ b/shotover-proxy/src/transforms/util/cluster_connection_pool.rs @@ -1,5 +1,5 @@ use super::Response; -use crate::codec::{Codec, CodecReadHalf, CodecWriteHalf}; +use crate::codec::{CodecBuilder, DecoderHalf, EncoderHalf}; use crate::tcp; use crate::tls::{TlsConnector, TlsConnectorConfig}; use crate::transforms::util::{ConnectionError, Request}; @@ -48,7 +48,7 @@ impl Token for T {} #[derive(Clone, Derivative)] #[derivative(Debug)] -pub struct ConnectionPool, T: Token> { +pub struct ConnectionPool, T: Token> { connect_timeout: Duration, lanes: Arc, Lane>>>, @@ -62,7 +62,7 @@ pub struct ConnectionPool, T: Token> { tls: Option, } -impl, T: Token> ConnectionPool { +impl, T: Token> ConnectionPool { pub fn new_with_auth( connect_timeout: Duration, codec: C, @@ -184,7 +184,7 @@ impl, T: Token> ConnectionPool } pub fn spawn_read_write_tasks< - C: Codec + 'static, + C: CodecBuilder + 'static, R: AsyncRead + Unpin + Send + 'static, W: AsyncWrite + Unpin + Send + 'static, >( @@ -196,11 +196,11 @@ pub fn spawn_read_write_tasks< let (return_tx, return_rx) = tokio::sync::mpsc::unbounded_channel::(); let (closed_tx, closed_rx) = tokio::sync::oneshot::channel(); - let codec_clone = codec.clone(); + let (decoder, encoder) = codec.build(); tokio::spawn(async move { tokio::select! { - result = tx_process(stream_tx, out_rx, return_tx, codec_clone) => if let Err(e) = result { + result = tx_process(stream_tx, out_rx, return_tx, encoder) => if let Err(e) = result { trace!("connection write-closed with error: {:?}", e); } else { trace!("connection write-closed gracefully"); @@ -211,11 +211,9 @@ pub fn spawn_read_write_tasks< } }.in_current_span()); - let codec_clone = codec.clone(); - tokio::spawn( async move { - if let Err(e) = rx_process(stream_rx, return_rx, codec_clone).await { + if let Err(e) = rx_process(stream_rx, return_rx, decoder).await { trace!("connection read-closed with error: {:?}", e); } else { trace!("connection read-closed gracefully"); @@ -230,7 +228,7 @@ pub fn spawn_read_write_tasks< out_tx } -async fn tx_process( +async fn tx_process( write: W, out_rx: UnboundedReceiver, return_tx: UnboundedSender, @@ -245,7 +243,7 @@ async fn tx_process( rx_stream.forward(writer).await } -async fn rx_process( +async fn rx_process( read: R, mut return_rx: UnboundedReceiver, codec: C,