From d2753a8ca62d8ee9cf0820a10ea5fd81060b7419 Mon Sep 17 00:00:00 2001 From: Conor Brosnan Date: Tue, 7 Mar 2023 14:45:08 +1100 Subject: [PATCH] codec direction logging --- shotover-proxy/benches/benches/codec.rs | 6 +- shotover-proxy/src/codec/cassandra.rs | 54 ++++++++++-------- shotover-proxy/src/codec/kafka.rs | 55 +++++++++++-------- shotover-proxy/src/codec/mod.rs | 18 ++++++ shotover-proxy/src/codec/redis.rs | 49 ++++++++++------- .../src/sources/cassandra_source.rs | 5 +- shotover-proxy/src/sources/kafka.rs | 2 +- shotover-proxy/src/sources/redis_source.rs | 4 +- .../transforms/cassandra/sink_cluster/node.rs | 3 +- .../src/transforms/cassandra/sink_single.rs | 4 +- .../src/transforms/kafka/sink_single.rs | 2 +- .../transforms/redis/cluster_ports_rewrite.rs | 3 +- .../src/transforms/redis/sink_cluster.rs | 6 +- .../src/transforms/redis/sink_single.rs | 25 ++++----- .../util/cluster_connection_pool.rs | 5 +- 15 files changed, 146 insertions(+), 95 deletions(-) diff --git a/shotover-proxy/benches/benches/codec.rs b/shotover-proxy/benches/benches/codec.rs index ab980a386..4d453040c 100644 --- a/shotover-proxy/benches/benches/codec.rs +++ b/shotover-proxy/benches/benches/codec.rs @@ -5,7 +5,7 @@ use cassandra_protocol::frame::message_result::{ use cassandra_protocol::frame::Version; use criterion::{black_box, criterion_group, BatchSize, Criterion}; use shotover_proxy::codec::cassandra::CassandraCodecBuilder; -use shotover_proxy::codec::CodecBuilder; +use shotover_proxy::codec::{CodecBuilder, Direction}; use shotover_proxy::frame::cassandra::{parse_statement_single, Tracing}; use shotover_proxy::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame}; use shotover_proxy::message::Message; @@ -28,7 +28,7 @@ fn criterion_benchmark(c: &mut Criterion) { }, }))]; - let (_, mut encoder) = CassandraCodecBuilder::new().build(); + let (_, mut encoder) = CassandraCodecBuilder::new(Direction::Sink).build(); group.bench_function("encode_cassandra_system.local_query", |b| { b.iter_batched( @@ -52,7 +52,7 @@ fn criterion_benchmark(c: &mut Criterion) { operation: CassandraOperation::Result(peers_v2_result()), }))]; - let (_, mut encoder) = CassandraCodecBuilder::new().build(); + let (_, mut encoder) = CassandraCodecBuilder::new(Direction::Sink).build(); group.bench_function("encode_cassandra_system.local_result", |b| { b.iter_batched( diff --git a/shotover-proxy/src/codec/cassandra.rs b/shotover-proxy/src/codec/cassandra.rs index c74d96efa..7350f4e8c 100644 --- a/shotover-proxy/src/codec/cassandra.rs +++ b/shotover-proxy/src/codec/cassandra.rs @@ -1,3 +1,4 @@ +use super::Direction; use crate::codec::{CodecBuilder, CodecReadError}; use crate::frame::cassandra::{CassandraMetadata, CassandraOperation, Tracing}; use crate::frame::{CassandraFrame, Frame, MessageType}; @@ -17,23 +18,24 @@ use std::sync::RwLock; use tokio_util::codec::{Decoder, Encoder}; use tracing::info; -#[derive(Clone, Default)] -pub struct CassandraCodecBuilder {} - -impl CassandraCodecBuilder { - pub fn new() -> Self { - Self::default() - } +#[derive(Clone)] +pub struct CassandraCodecBuilder { + direction: Direction, } impl CodecBuilder for CassandraCodecBuilder { type Decoder = CassandraDecoder; type Encoder = CassandraEncoder; + + fn new(direction: Direction) -> Self { + Self { direction } + } + fn build(&self) -> (CassandraDecoder, CassandraEncoder) { let compression = Arc::new(RwLock::new(Compression::None)); ( - CassandraDecoder::new(compression.clone()), - CassandraEncoder::new(compression), + CassandraDecoder::new(compression.clone(), self.direction), + CassandraEncoder::new(compression, self.direction), ) } } @@ -42,14 +44,16 @@ pub struct CassandraDecoder { compression: Arc>, messages: Vec, current_use_keyspace: Option, + direction: Direction, } impl CassandraDecoder { - pub fn new(compression: Arc>) -> CassandraDecoder { + pub fn new(compression: Arc>, direction: Direction) -> CassandraDecoder { CassandraDecoder { compression, messages: vec![], current_use_keyspace: None, + direction, } } } @@ -102,7 +106,8 @@ impl Decoder for CassandraDecoder { // Clear the read bytes from the FramedReader let bytes = src.split_to(frame_len); tracing::debug!( - "incoming cassandra message:\n{}", + "{}: incoming cassandra message:\n{}", + self.direction, pretty_hex::pretty_hex(&bytes) ); @@ -253,11 +258,15 @@ fn reject_protocol_version(version: u8) -> CodecReadError { pub struct CassandraEncoder { compression: Arc>, + direction: Direction, } impl CassandraEncoder { - pub fn new(compression: Arc>) -> CassandraEncoder { - CassandraEncoder { compression } + pub fn new(compression: Arc>, direction: Direction) -> CassandraEncoder { + CassandraEncoder { + compression, + direction, + } } } @@ -308,7 +317,8 @@ impl Encoder for CassandraEncoder { } } tracing::debug!( - "outgoing cassandra message:\n{}", + "{}: outgoing cassandra message:\n{}", + self.direction, pretty_hex::pretty_hex(&&dst[start..]) ); } @@ -319,7 +329,7 @@ impl Encoder for CassandraEncoder { #[cfg(test)] mod cassandra_protocol_tests { use crate::codec::cassandra::CassandraCodecBuilder; - use crate::codec::CodecBuilder; + use crate::codec::{CodecBuilder, Direction}; use crate::frame::cassandra::{ parse_statement_single, CassandraFrame, CassandraOperation, CassandraResult, Tracing, }; @@ -376,7 +386,7 @@ mod cassandra_protocol_tests { #[test] fn test_codec_startup() { - let mut codec = CassandraCodecBuilder::new(); + let mut codec = CassandraCodecBuilder::new(Direction::Sink); let mut startup_body: HashMap = HashMap::new(); startup_body.insert("CQL_VERSION".into(), "3.0.0".into()); let bytes = hex!("0400000001000000160001000b43514c5f56455253494f4e0005332e302e30"); @@ -392,7 +402,7 @@ mod cassandra_protocol_tests { #[test] fn test_codec_options() { - let mut codec = CassandraCodecBuilder::new(); + let mut codec = CassandraCodecBuilder::new(Direction::Sink); let bytes = hex!("040000000500000000"); let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { version: Version::V4, @@ -406,7 +416,7 @@ mod cassandra_protocol_tests { #[test] fn test_codec_ready() { - let mut codec = CassandraCodecBuilder::new(); + let mut codec = CassandraCodecBuilder::new(Direction::Sink); let bytes = hex!("840000000200000000"); let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { version: Version::V4, @@ -420,7 +430,7 @@ mod cassandra_protocol_tests { #[test] fn test_codec_register() { - let mut codec = CassandraCodecBuilder::new(); + let mut codec = CassandraCodecBuilder::new(Direction::Sink); let bytes = hex!( "040000010b000000310003000f544f504f4c4f47595f4348414e4745 000d5354415455535f4348414e4745000d534348454d415f4348414e4745" @@ -443,7 +453,7 @@ mod cassandra_protocol_tests { #[test] fn test_codec_result() { - let mut codec = CassandraCodecBuilder::new(); + let mut codec = CassandraCodecBuilder::new(Direction::Sink); let bytes = hex!( "840000020800000099000000020000000100000009000673797374656 d000570656572730004706565720010000b646174615f63656e746572000d0007686f73745f6964000c000c70726566 @@ -551,7 +561,7 @@ mod cassandra_protocol_tests { #[test] fn test_codec_query_select() { - let mut codec = CassandraCodecBuilder::new(); + let mut codec = CassandraCodecBuilder::new(Direction::Sink); let bytes = hex!( "0400000307000000350000002e53454c454354202a2046524f4d20737973 74656d2e6c6f63616c205748455245206b6579203d20276c6f63616c27000100" @@ -574,7 +584,7 @@ mod cassandra_protocol_tests { #[test] fn test_codec_query_insert() { - let mut codec = CassandraCodecBuilder::new(); + let mut codec = CassandraCodecBuilder::new(Direction::Sink); let bytes = hex!( "0400000307000000330000002c494e5345525420494e544f207379737465 6d2e666f6f2028626172292056414c554553202827626172322729000100" diff --git a/shotover-proxy/src/codec/kafka.rs b/shotover-proxy/src/codec/kafka.rs index d9cbe8c05..9bbb40580 100644 --- a/shotover-proxy/src/codec/kafka.rs +++ b/shotover-proxy/src/codec/kafka.rs @@ -1,3 +1,4 @@ +use super::Direction; use crate::codec::{CodecBuilder, CodecReadError}; use crate::frame::MessageType; use crate::message::{Encodable, Message, Messages, ProtocolType}; @@ -7,16 +8,6 @@ use kafka_protocol::messages::ApiKey; use std::sync::mpsc; use tokio_util::codec::{Decoder, Encoder}; -/// Depending on if the codec is used in a sink or a source requires different processing logic: -/// * Sources parse requests which do not require any special handling -/// * Sinks parse responses which requires first matching up the version and api_key with its corresponding request -/// + To achieve this Sinks use an mpsc channel to send header data from the encoder to the decoder -#[derive(Copy, Clone)] -pub enum Direction { - Source, - Sink, -} - #[derive(Copy, Clone, Debug, PartialEq)] pub struct RequestHeader { pub api_key: ApiKey, @@ -28,15 +19,18 @@ pub struct KafkaCodecBuilder { direction: Direction, } -impl KafkaCodecBuilder { - pub fn new(direction: Direction) -> Self { - KafkaCodecBuilder { direction } - } -} - +// Depending on if the codec is used in a sink or a source requires different processing logic: +// * Sources parse requests which do not require any special handling +// * Sinks parse responses which requires first matching up the version and api_key with its corresponding request +// + To achieve this Sinks use an mpsc channel to send header data from the encoder to the decoder impl CodecBuilder for KafkaCodecBuilder { type Decoder = KafkaDecoder; type Encoder = KafkaEncoder; + + fn new(direction: Direction) -> Self { + Self { direction } + } + fn build(&self) -> (KafkaDecoder, KafkaEncoder) { let (tx, rx) = match self.direction { Direction::Source => (None, None), @@ -45,20 +39,28 @@ impl CodecBuilder for KafkaCodecBuilder { (Some(tx), Some(rx)) } }; - (KafkaDecoder::new(rx), KafkaEncoder::new(tx)) + ( + KafkaDecoder::new(rx, self.direction), + KafkaEncoder::new(tx, self.direction), + ) } } pub struct KafkaDecoder { request_header_rx: Option>, messages: Messages, + direction: Direction, } impl KafkaDecoder { - pub fn new(request_header_rx: Option>) -> Self { + pub fn new( + request_header_rx: Option>, + direction: Direction, + ) -> Self { KafkaDecoder { request_header_rx, messages: vec![], + direction, } } } @@ -85,7 +87,8 @@ impl Decoder for KafkaDecoder { if let Some(size) = get_length_of_full_message(src) { let bytes = src.split_to(size); tracing::debug!( - "incoming kafka message:\n{}", + "{}: incoming kafka message:\n{}", + self.direction, pretty_hex::pretty_hex(&bytes) ); let request_header = if let Some(rx) = self.request_header_rx.as_ref() { @@ -110,11 +113,18 @@ impl Decoder for KafkaDecoder { pub struct KafkaEncoder { request_header_tx: Option>, + direction: Direction, } impl KafkaEncoder { - pub fn new(request_header_tx: Option>) -> Self { - KafkaEncoder { request_header_tx } + pub fn new( + request_header_tx: Option>, + direction: Direction, + ) -> Self { + KafkaEncoder { + request_header_tx, + direction, + } } } @@ -140,7 +150,8 @@ impl Encoder for KafkaEncoder { tx.send(RequestHeader { api_key, version })?; } tracing::debug!( - "outgoing kafka message:\n{}", + "{}: outgoing kafka message:\n{}", + self.direction, pretty_hex::pretty_hex(&&dst[start..]) ); result diff --git a/shotover-proxy/src/codec/mod.rs b/shotover-proxy/src/codec/mod.rs index bf06c04d5..0b9d8592d 100644 --- a/shotover-proxy/src/codec/mod.rs +++ b/shotover-proxy/src/codec/mod.rs @@ -1,5 +1,6 @@ use crate::message::Messages; use cassandra_protocol::compression::Compression; +use core::fmt; use kafka::RequestHeader; use tokio_util::codec::{Decoder, Encoder}; @@ -7,6 +8,21 @@ pub mod cassandra; pub mod kafka; pub mod redis; +#[derive(Copy, Clone)] +pub enum Direction { + Source, + Sink, +} + +impl fmt::Display for Direction { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Sink => write!(f, "Sink"), + Self::Source => write!(f, "Source"), + } + } +} + #[derive(Debug, Clone, PartialEq, Copy)] pub enum CodecState { Cassandra { @@ -66,4 +82,6 @@ pub trait CodecBuilder: Clone + Send { type Decoder: DecoderHalf; type Encoder: EncoderHalf; fn build(&self) -> (Self::Decoder, Self::Encoder); + + fn new(direction: Direction) -> Self; } diff --git a/shotover-proxy/src/codec/redis.rs b/shotover-proxy/src/codec/redis.rs index 9a9066edf..9f28774a4 100644 --- a/shotover-proxy/src/codec/redis.rs +++ b/shotover-proxy/src/codec/redis.rs @@ -1,3 +1,4 @@ +use super::Direction; use crate::codec::{CodecBuilder, CodecReadError}; use crate::frame::{Frame, MessageType}; use crate::message::{Encodable, Message, Messages}; @@ -7,34 +8,42 @@ use redis_protocol::resp2::prelude::decode_mut; use redis_protocol::resp2::prelude::encode_bytes; use tokio_util::codec::{Decoder, Encoder}; -#[derive(Default, Clone)] -pub struct RedisCodecBuilder {} +#[derive(Clone)] +pub struct RedisCodecBuilder { + direction: Direction, +} impl CodecBuilder for RedisCodecBuilder { type Decoder = RedisDecoder; type Encoder = RedisEncoder; - fn build(&self) -> (RedisDecoder, RedisEncoder) { - (RedisDecoder::new(), RedisEncoder::new()) + + fn new(direction: Direction) -> Self { + Self { direction } } -} -impl RedisCodecBuilder { - pub fn new() -> RedisCodecBuilder { - RedisCodecBuilder::default() + fn build(&self) -> (RedisDecoder, RedisEncoder) { + ( + RedisDecoder::new(self.direction), + RedisEncoder::new(self.direction), + ) } } -#[derive(Default)] -pub struct RedisEncoder {} +pub struct RedisEncoder { + direction: Direction, +} -#[derive(Default)] pub struct RedisDecoder { messages: Messages, + direction: Direction, } impl RedisDecoder { - pub fn new() -> Self { - Default::default() + pub fn new(direction: Direction) -> Self { + Self { + messages: Vec::new(), + direction, + } } } @@ -49,7 +58,8 @@ impl Decoder for RedisDecoder { })? { Some((frame, _size, bytes)) => { tracing::debug!( - "incoming redis message:\n{}", + "{}: incoming redis message:\n{}", + self.direction, pretty_hex::pretty_hex(&bytes) ); self.messages @@ -68,8 +78,8 @@ impl Decoder for RedisDecoder { } impl RedisEncoder { - pub fn new() -> Self { - Default::default() + pub fn new(direction: Direction) -> Self { + Self { direction } } } @@ -92,7 +102,8 @@ impl Encoder for RedisEncoder { } }; tracing::debug!( - "outgoing redis message:\n{}", + "{}: outgoing redis message:\n{}", + self.direction, pretty_hex::pretty_hex(&&dst[start..]) ); result @@ -102,7 +113,7 @@ impl Encoder for RedisEncoder { #[cfg(test)] mod redis_tests { - use crate::codec::{redis::RedisCodecBuilder, CodecBuilder}; + use crate::codec::{redis::RedisCodecBuilder, CodecBuilder, Direction}; use bytes::BytesMut; use hex_literal::hex; use tokio_util::codec::{Decoder, Encoder}; @@ -130,7 +141,7 @@ mod redis_tests { const HSET_MESSAGE: [u8; 75] = hex!("2a340d0a24340d0a485345540d0a2431380d0a6d797365743a5f5f72616e645f696e745f5f0d0a2432300d0a656c656d656e743a5f5f72616e645f696e745f5f0d0a24330d0a7878780d0a"); fn test_frame(raw_frame: &[u8]) { - let (mut decoder, mut encoder) = RedisCodecBuilder::new().build(); + let (mut decoder, mut encoder) = RedisCodecBuilder::new(Direction::Sink).build(); let message = decoder .decode(&mut BytesMut::from(raw_frame)) .unwrap() diff --git a/shotover-proxy/src/sources/cassandra_source.rs b/shotover-proxy/src/sources/cassandra_source.rs index ca8c3a88c..e1f2eb1ed 100644 --- a/shotover-proxy/src/sources/cassandra_source.rs +++ b/shotover-proxy/src/sources/cassandra_source.rs @@ -1,4 +1,5 @@ -use crate::codec::cassandra::CassandraCodecBuilder; +use crate::codec::Direction; +use crate::codec::{cassandra::CassandraCodecBuilder, CodecBuilder}; use crate::server::TcpCodecListener; use crate::sources::Sources; use crate::tls::{TlsAcceptor, TlsAcceptorConfig}; @@ -67,7 +68,7 @@ impl CassandraSource { name.to_string(), listen_addr.clone(), hard_connection_limit.unwrap_or(false), - CassandraCodecBuilder::new(), + CassandraCodecBuilder::new(Direction::Source), Arc::new(Semaphore::new(connection_limit.unwrap_or(512))), trigger_shutdown_rx.clone(), tls.map(TlsAcceptor::new).transpose()?, diff --git a/shotover-proxy/src/sources/kafka.rs b/shotover-proxy/src/sources/kafka.rs index 4b6b39c51..5c6e73917 100644 --- a/shotover-proxy/src/sources/kafka.rs +++ b/shotover-proxy/src/sources/kafka.rs @@ -1,4 +1,4 @@ -use crate::codec::kafka::{Direction, KafkaCodecBuilder}; +use crate::codec::{kafka::KafkaCodecBuilder, CodecBuilder, Direction}; use crate::server::TcpCodecListener; use crate::sources::Sources; use crate::tls::{TlsAcceptor, TlsAcceptorConfig}; diff --git a/shotover-proxy/src/sources/redis_source.rs b/shotover-proxy/src/sources/redis_source.rs index e98cff251..29c417f71 100644 --- a/shotover-proxy/src/sources/redis_source.rs +++ b/shotover-proxy/src/sources/redis_source.rs @@ -1,4 +1,4 @@ -use crate::codec::redis::RedisCodecBuilder; +use crate::codec::{redis::RedisCodecBuilder, CodecBuilder, Direction}; use crate::server::TcpCodecListener; use crate::sources::Sources; use crate::tls::{TlsAcceptor, TlsAcceptorConfig}; @@ -65,7 +65,7 @@ impl RedisSource { name.to_string(), listen_addr.clone(), hard_connection_limit.unwrap_or(false), - RedisCodecBuilder::new(), + RedisCodecBuilder::new(Direction::Source), Arc::new(Semaphore::new(connection_limit.unwrap_or(512))), trigger_shutdown_rx.clone(), tls.map(TlsAcceptor::new).transpose()?, diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs index 305594604..a04093bfb 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs @@ -1,4 +1,5 @@ use crate::codec::cassandra::CassandraCodecBuilder; +use crate::codec::{CodecBuilder, Direction}; use crate::frame::Frame; use crate::message::{Message, Messages}; use crate::tls::{TlsConnector, ToHostname}; @@ -107,7 +108,7 @@ impl ConnectionFactory { let outbound = CassandraConnection::new( self.connect_timeout, address, - CassandraCodecBuilder::new(), + CassandraCodecBuilder::new(Direction::Sink), self.tls.clone(), self.pushed_messages_tx.clone(), ) diff --git a/shotover-proxy/src/transforms/cassandra/sink_single.rs b/shotover-proxy/src/transforms/cassandra/sink_single.rs index cb714a564..3f03018f4 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_single.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_single.rs @@ -1,5 +1,5 @@ use super::connection::CassandraConnection; -use crate::codec::cassandra::CassandraCodecBuilder; +use crate::codec::{cassandra::CassandraCodecBuilder, CodecBuilder, Direction}; use crate::error::ChainResponse; use crate::frame::cassandra::CassandraMetadata; use crate::message::{Messages, Metadata}; @@ -131,7 +131,7 @@ impl CassandraSinkSingle { CassandraConnection::new( self.connect_timeout, self.address.clone(), - CassandraCodecBuilder::new(), + CassandraCodecBuilder::new(Direction::Sink), self.tls.clone(), self.pushed_messages_tx.clone(), ) diff --git a/shotover-proxy/src/transforms/kafka/sink_single.rs b/shotover-proxy/src/transforms/kafka/sink_single.rs index a3e4c2efc..29fc99c04 100644 --- a/shotover-proxy/src/transforms/kafka/sink_single.rs +++ b/shotover-proxy/src/transforms/kafka/sink_single.rs @@ -1,4 +1,4 @@ -use crate::codec::kafka::{Direction, KafkaCodecBuilder}; +use crate::codec::{kafka::KafkaCodecBuilder, CodecBuilder, Direction}; use crate::error::ChainResponse; use crate::frame::kafka::{KafkaFrame, ResponseBody}; use crate::frame::Frame; diff --git a/shotover-proxy/src/transforms/redis/cluster_ports_rewrite.rs b/shotover-proxy/src/transforms/redis/cluster_ports_rewrite.rs index b69f7e3d1..1ea4a0e68 100644 --- a/shotover-proxy/src/transforms/redis/cluster_ports_rewrite.rs +++ b/shotover-proxy/src/transforms/redis/cluster_ports_rewrite.rs @@ -216,6 +216,7 @@ fn is_cluster_slots(frame: &Frame) -> bool { mod test { use super::*; use crate::codec::redis::RedisDecoder; + use crate::codec::Direction; use crate::transforms::redis::sink_cluster::parse_slots; use tokio_util::codec::Decoder; @@ -278,7 +279,7 @@ mod test { #[test] fn test_rewrite_port_slots() { let slots_pcap: &[u8] = b"*3\r\n*4\r\n:10923\r\n:16383\r\n*3\r\n$12\r\n192.168.80.6\r\n:6379\r\n$40\r\n3a7c357ed75d2aa01fca1e14ef3735a2b2b8ffac\r\n*3\r\n$12\r\n192.168.80.3\r\n:6379\r\n$40\r\n77c01b0ddd8668fff05e3f6a8aaf5f3ccd454a79\r\n*4\r\n:5461\r\n:10922\r\n*3\r\n$12\r\n192.168.80.5\r\n:6379\r\n$40\r\n969c6215d064e68593d384541ceeb57e9520dbed\r\n*3\r\n$12\r\n192.168.80.2\r\n:6379\r\n$40\r\n3929f69990a75be7b2d49594c57fe620862e6fd6\r\n*4\r\n:0\r\n:5460\r\n*3\r\n$12\r\n192.168.80.7\r\n:6379\r\n$40\r\n15d52a65d1fc7a53e34bf9193415aa39136882b2\r\n*3\r\n$12\r\n192.168.80.4\r\n:6379\r\n$40\r\ncd023916a3528fae7e606a10d8289a665d6c47b0\r\n"; - let mut codec = RedisDecoder::new(); + let mut codec = RedisDecoder::new(Direction::Sink); let mut message = codec .decode(&mut slots_pcap.into()) .unwrap() diff --git a/shotover-proxy/src/transforms/redis/sink_cluster.rs b/shotover-proxy/src/transforms/redis/sink_cluster.rs index 2c19ca74e..b443c5924 100644 --- a/shotover-proxy/src/transforms/redis/sink_cluster.rs +++ b/shotover-proxy/src/transforms/redis/sink_cluster.rs @@ -1,4 +1,5 @@ use crate::codec::redis::RedisCodecBuilder; +use crate::codec::{CodecBuilder, Direction}; use crate::error::ChainResponse; use crate::frame::{Frame, RedisFrame}; use crate::message::Message; @@ -101,7 +102,7 @@ impl RedisSinkCluster { let connect_timeout = Duration::from_millis(connect_timeout_ms); let connection_pool = ConnectionPool::new_with_auth( connect_timeout, - RedisCodecBuilder::new(), + RedisCodecBuilder::new(Direction::Sink), authenticator, tls, )?; @@ -1069,6 +1070,7 @@ impl Authenticator for RedisAuthenticator { mod test { use super::*; use crate::codec::redis::RedisDecoder; + use crate::codec::Direction; use tokio_util::codec::Decoder; #[test] @@ -1076,7 +1078,7 @@ mod test { // Wireshark capture from a Redis cluster with 3 masters and 3 replicas. let slots_pcap: &[u8] = b"*3\r\n*4\r\n:10923\r\n:16383\r\n*3\r\n$12\r\n192.168.80.6\r\n:6379\r\n$40\r\n3a7c357ed75d2aa01fca1e14ef3735a2b2b8ffac\r\n*3\r\n$12\r\n192.168.80.3\r\n:6379\r\n$40\r\n77c01b0ddd8668fff05e3f6a8aaf5f3ccd454a79\r\n*4\r\n:5461\r\n:10922\r\n*3\r\n$12\r\n192.168.80.5\r\n:6379\r\n$40\r\n969c6215d064e68593d384541ceeb57e9520dbed\r\n*3\r\n$12\r\n192.168.80.2\r\n:6379\r\n$40\r\n3929f69990a75be7b2d49594c57fe620862e6fd6\r\n*4\r\n:0\r\n:5460\r\n*3\r\n$12\r\n192.168.80.7\r\n:6379\r\n$40\r\n15d52a65d1fc7a53e34bf9193415aa39136882b2\r\n*3\r\n$12\r\n192.168.80.4\r\n:6379\r\n$40\r\ncd023916a3528fae7e606a10d8289a665d6c47b0\r\n"; - let mut codec = RedisDecoder::new(); + let mut codec = RedisDecoder::new(Direction::Sink); let mut message = codec .decode(&mut slots_pcap.into()) diff --git a/shotover-proxy/src/transforms/redis/sink_single.rs b/shotover-proxy/src/transforms/redis/sink_single.rs index 042debcb1..48be9fb9f 100644 --- a/shotover-proxy/src/transforms/redis/sink_single.rs +++ b/shotover-proxy/src/transforms/redis/sink_single.rs @@ -1,16 +1,13 @@ -use crate::codec::redis::RedisCodecBuilder; -use crate::codec::redis::RedisDecoder; -use crate::codec::redis::RedisEncoder; -use crate::codec::CodecBuilder; -use crate::codec::CodecReadError; -use crate::error::ChainResponse; -use crate::frame::Frame; -use crate::frame::RedisFrame; +use crate::codec::{ + redis::{RedisCodecBuilder, RedisDecoder, RedisEncoder}, + CodecBuilder, CodecReadError, Direction, +}; +use crate::frame::{Frame, RedisFrame}; use crate::message::{Message, Messages}; use crate::tcp; use crate::tls::{AsyncStream, TlsConnector, TlsConnectorConfig}; -use crate::transforms::Transforms; -use crate::transforms::{Transform, TransformBuilder, Wrapper}; +use crate::transforms::ChainResponse; +use crate::transforms::{Transform, TransformBuilder, Transforms, Wrapper}; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use futures::{FutureExt, SinkExt, StreamExt}; @@ -19,11 +16,9 @@ use serde::Deserialize; use std::fmt::Debug; use std::pin::Pin; use std::time::Duration; -use tokio::io::ReadHalf; -use tokio::io::WriteHalf; +use tokio::io::{ReadHalf, WriteHalf}; use tokio::sync::mpsc; -use tokio_util::codec::FramedRead; -use tokio_util::codec::FramedWrite; +use tokio_util::codec::{FramedRead, FramedWrite}; use tracing::Instrument; #[derive(Deserialize, Debug)] @@ -132,7 +127,7 @@ impl Transform for RedisSinkSingle { Box::pin(tcp_stream) as Pin> }; - let (decoder, encoder) = RedisCodecBuilder::new().build(); + let (decoder, encoder) = RedisCodecBuilder::new(Direction::Sink).build(); let (stream_rx, stream_tx) = tokio::io::split(generic_stream); let outbound_tx = FramedWrite::new(stream_tx, encoder); let outbound_rx = FramedRead::new(stream_rx, decoder); diff --git a/shotover-proxy/src/transforms/util/cluster_connection_pool.rs b/shotover-proxy/src/transforms/util/cluster_connection_pool.rs index c5c9ce932..d85bc80bf 100644 --- a/shotover-proxy/src/transforms/util/cluster_connection_pool.rs +++ b/shotover-proxy/src/transforms/util/cluster_connection_pool.rs @@ -288,6 +288,7 @@ async fn rx_process( mod test { use super::spawn_read_write_tasks; use crate::codec::redis::RedisCodecBuilder; + use crate::codec::{CodecBuilder, Direction}; use std::mem; use std::time::Duration; use tokio::io::AsyncReadExt; @@ -318,7 +319,7 @@ mod test { let stream = TcpStream::connect(("127.0.0.1", port)).await.unwrap(); let (rx, tx) = stream.into_split(); - let codec = RedisCodecBuilder::new(); + let codec = RedisCodecBuilder::new(Direction::Sink); let sender = spawn_read_write_tasks(&codec, rx, tx); assert!(remote.await.unwrap()); @@ -358,7 +359,7 @@ mod test { let stream = TcpStream::connect(("127.0.0.1", port)).await.unwrap(); let (rx, tx) = stream.into_split(); - let codec = RedisCodecBuilder::new(); + let codec = RedisCodecBuilder::new(Direction::Sink); // Drop sender immediately. std::mem::drop(spawn_read_write_tasks(&codec, rx, tx));