diff --git a/Cargo.lock b/Cargo.lock index 18dcf0fdf..dbe313423 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,9 +93,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.57" +version = "0.1.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76464446b8bc32758d7e88ee1a804d9914cd9b1cb264c029899680b0be29826f" +checksum = "1e805d94e6b5001b651426cf4cd446b1ab5f319d27bab5c644f61de0a804360c" dependencies = [ "proc-macro2", "quote", @@ -216,9 +216,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.11.0" +version = "3.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1ad822118d20d2c234f427000d5acc36eabe1e29a348c89b63dd60b13f28e5d" +checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" [[package]] name = "byteorder" @@ -324,7 +324,7 @@ dependencies = [ "snap", "thiserror", "time 0.3.15", - "uuid 1.2.1", + "uuid", ] [[package]] @@ -362,7 +362,7 @@ dependencies = [ "thiserror", "tokio", "tracing", - "uuid 1.2.1", + "uuid", ] [[package]] @@ -435,9 +435,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.0.15" +version = "4.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bf8832993da70a4c6d13c581f4463c2bdda27b9bf1c5498dc4365543abe6d6f" +checksum = "06badb543e734a2d6568e19a40af66ed5364360b9226184926f89d229b4b4267" dependencies = [ "atty", "bitflags", @@ -571,7 +571,7 @@ dependencies = [ "serde", "tree-sitter", "tree-sitter-cql", - "uuid 1.2.1", + "uuid", ] [[package]] @@ -1008,7 +1008,7 @@ dependencies = [ "cfg-if", "libc", "redox_syscall", - "windows-sys", + "windows-sys 0.36.1", ] [[package]] @@ -1715,7 +1715,7 @@ dependencies = [ "libc", "log", "wasi 0.11.0+wasi-snapshot-preview1", - "windows-sys", + "windows-sys 0.36.1", ] [[package]] @@ -2078,15 +2078,15 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.3" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" +checksum = "4dc9e0dc2adc1c69d09143aff38d3d30c5c3f0df0dad82e6d25547af174ebec0" dependencies = [ "cfg-if", "libc", "redox_syscall", "smallvec", - "windows-sys", + "windows-sys 0.42.0", ] [[package]] @@ -2107,7 +2107,7 @@ dependencies = [ "libloading", "pkg-config", "regex", - "windows-sys", + "windows-sys 0.36.1", ] [[package]] @@ -2470,9 +2470,9 @@ dependencies = [ [[package]] name = "redis" -version = "0.22.0" +version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e5450e9add55de0ea728a964191669d3707fa20666c5f16753f3a6a51e96231" +checksum = "513b3649f1a111c17954296e4a3b9eecb108b766c803e2b99f179ebe27005985" dependencies = [ "async-trait", "bytes 1.2.1", @@ -2771,7 +2771,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "88d6731146462ea25d9244b2ed5fd1d716d25c52e4d54aa4fb0f3c4e9854dbe2" dependencies = [ "lazy_static", - "windows-sys", + "windows-sys 0.36.1", ] [[package]] @@ -2817,7 +2817,7 @@ dependencies = [ "tokio", "tokio-openssl", "tracing", - "uuid 1.2.1", + "uuid", ] [[package]] @@ -2837,7 +2837,7 @@ dependencies = [ "snap", "thiserror", "tokio", - "uuid 1.2.1", + "uuid", ] [[package]] @@ -3049,7 +3049,7 @@ dependencies = [ "cassandra-protocol", "cdrs-tokio", "chacha20poly1305", - "clap 4.0.15", + "clap 4.0.17", "cql3-parser", "crc16", "criterion", @@ -3102,7 +3102,7 @@ dependencies = [ "tracing", "tracing-appender", "tracing-subscriber", - "uuid 1.2.1", + "uuid", "version-compare", ] @@ -3673,12 +3673,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "uuid" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" - [[package]] name = "uuid" version = "1.2.1" @@ -3859,43 +3853,100 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" dependencies = [ - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_msvc", + "windows_aarch64_msvc 0.36.1", + "windows_i686_gnu 0.36.1", + "windows_i686_msvc 0.36.1", + "windows_x86_64_gnu 0.36.1", + "windows_x86_64_msvc 0.36.1", ] +[[package]] +name = "windows-sys" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc 0.42.0", + "windows_i686_gnu 0.42.0", + "windows_i686_msvc 0.42.0", + "windows_x86_64_gnu 0.42.0", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc 0.42.0", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e" + [[package]] name = "windows_aarch64_msvc" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4" + [[package]] name = "windows_i686_gnu" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" +[[package]] +name = "windows_i686_gnu" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7" + [[package]] name = "windows_i686_msvc" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" +[[package]] +name = "windows_i686_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246" + [[package]] name = "windows_x86_64_gnu" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028" + [[package]] name = "windows_x86_64_msvc" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" + [[package]] name = "winreg" version = "0.10.1" diff --git a/shotover-proxy/benches/benches/chain.rs b/shotover-proxy/benches/benches/chain.rs index 6684eef65..ad916112e 100644 --- a/shotover-proxy/benches/benches/chain.rs +++ b/shotover-proxy/benches/benches/chain.rs @@ -231,12 +231,13 @@ fn criterion_benchmark(c: &mut Criterion) { let wrapper = Wrapper::new_with_chain_name( vec![Message::from_bytes( - CassandraFrame::new( - Version::V4, - Flags::default(), - 0, - vec![], - CassandraOperation::Query { + CassandraFrame { + version: Version::V4, + flags: Flags::default(), + stream_id: 0, + tracing_id: None, + warnings: vec![], + operation: CassandraOperation::Query { query: Box::new(parse_statement_single( "INSERT INTO foo (z, v) VALUES (1, 123)", )), @@ -252,8 +253,7 @@ fn criterion_benchmark(c: &mut Criterion) { now_in_seconds: None, }), }, - None, - ) + } .encode() .encode_with(Compression::None) .unwrap() @@ -339,12 +339,13 @@ fn criterion_benchmark(c: &mut Criterion) { fn cassandra_parsed_query(query: &str) -> Wrapper { Wrapper::new_with_chain_name( - vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( - Version::V4, - Flags::default(), - 0, - vec![], - CassandraOperation::Query { + vec![Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V4, + flags: Flags::default(), + stream_id: 0, + tracing_id: None, + warnings: vec![], + operation: CassandraOperation::Query { query: Box::new(parse_statement_single(query)), params: Box::new(QueryParams { consistency: Consistency::One, @@ -358,8 +359,7 @@ fn cassandra_parsed_query(query: &str) -> Wrapper { now_in_seconds: None, }), }, - None, - )))], + }))], "bench".into(), "127.0.0.1:6379".parse().unwrap(), ) diff --git a/shotover-proxy/benches/benches/codec.rs b/shotover-proxy/benches/benches/codec.rs index a212bfa6f..e5a412997 100644 --- a/shotover-proxy/benches/benches/codec.rs +++ b/shotover-proxy/benches/benches/codec.rs @@ -18,17 +18,17 @@ fn criterion_benchmark(c: &mut Criterion) { group.noise_threshold(0.2); { - let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( - Version::V4, - Flags::default(), - 1, - vec![], - CassandraOperation::Query { + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V4, + flags: Flags::default(), + stream_id: 1, + tracing_id: None, + warnings: vec![], + operation: CassandraOperation::Query { query: Box::new(parse_statement_single("SELECT * FROM system.local;")), params: Box::new(QueryParams::default()), }, - None, - )))]; + }))]; let mut codec = CassandraCodec::new(); @@ -46,14 +46,14 @@ fn criterion_benchmark(c: &mut Criterion) { } { - let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( - Version::V4, - Flags::default(), - 0, - vec![], - CassandraOperation::Result(peers_v2_result()), - None, - )))]; + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V4, + flags: Flags::default(), + stream_id: 0, + tracing_id: None, + warnings: vec![], + operation: CassandraOperation::Result(peers_v2_result()), + }))]; let mut codec = CassandraCodec::new(); diff --git a/shotover-proxy/src/codec/cassandra.rs b/shotover-proxy/src/codec/cassandra.rs index e830729b7..75a9a6035 100644 --- a/shotover-proxy/src/codec/cassandra.rs +++ b/shotover-proxy/src/codec/cassandra.rs @@ -183,17 +183,17 @@ fn reject_protocol_version(version: u8) -> CodecReadError { ); CodecReadError::RespondAndThenCloseConnection(vec![Message::from_frame(Frame::Cassandra( - CassandraFrame::new( - Version::V4, - Flags::default(), - 0, - vec![], - CassandraOperation::Error(ErrorBody { + CassandraFrame { + version: Version::V4, + flags: Flags::default(), + stream_id: 0, + operation: CassandraOperation::Error(ErrorBody { message: "Invalid or unsupported protocol version".into(), ty: ErrorType::Protocol, }), - None, - ), + tracing_id: None, + warnings: vec![], + }, ))]) } @@ -280,16 +280,16 @@ mod cassandra_protocol_tests { fn test_codec_startup() { let mut codec = CassandraCodec::new(); let bytes = hex!("0400000001000000160001000b43514c5f56455253494f4e0005332e302e30"); - let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( - Version::V4, - Flags::default(), - 0, - vec![], - CassandraOperation::Startup(vec![ + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V4, + flags: Flags::default(), + operation: CassandraOperation::Startup(vec![ 0, 1, 0, 11, 67, 81, 76, 95, 86, 69, 82, 83, 73, 79, 78, 0, 5, 51, 46, 48, 46, 48, ]), - None, - )))]; + stream_id: 0, + tracing_id: None, + warnings: vec![], + }))]; test_frame_codec_roundtrip(&mut codec, &bytes, messages); } @@ -297,14 +297,14 @@ mod cassandra_protocol_tests { fn test_codec_options() { let mut codec = CassandraCodec::new(); let bytes = hex!("040000000500000000"); - let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( - Version::V4, - Flags::default(), - 0, - vec![], - CassandraOperation::Options(vec![]), - None, - )))]; + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V4, + flags: Flags::default(), + operation: CassandraOperation::Options(vec![]), + stream_id: 0, + tracing_id: None, + warnings: vec![], + }))]; test_frame_codec_roundtrip(&mut codec, &bytes, messages); } @@ -312,14 +312,14 @@ mod cassandra_protocol_tests { fn test_codec_ready() { let mut codec = CassandraCodec::new(); let bytes = hex!("840000000200000000"); - let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( - Version::V4, - Flags::default(), - 0, - vec![], - CassandraOperation::Ready(vec![]), - None, - )))]; + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V4, + flags: Flags::default(), + operation: CassandraOperation::Ready(vec![]), + stream_id: 0, + tracing_id: None, + warnings: vec![], + }))]; test_frame_codec_roundtrip(&mut codec, &bytes, messages); } @@ -330,20 +330,20 @@ mod cassandra_protocol_tests { "040000010b000000310003000f544f504f4c4f47595f4348414e4745 000d5354415455535f4348414e4745000d534348454d415f4348414e4745" ); - let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( - Version::V4, - Flags::default(), - 1, - vec![], - CassandraOperation::Register(BodyReqRegister { + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V4, + flags: Flags::default(), + operation: CassandraOperation::Register(BodyReqRegister { events: vec![ SimpleServerEvent::TopologyChange, SimpleServerEvent::StatusChange, SimpleServerEvent::SchemaChange, ], }), - None, - )))]; + stream_id: 1, + tracing_id: None, + warnings: vec![], + }))]; test_frame_codec_roundtrip(&mut codec, &bytes, messages); } @@ -356,12 +356,10 @@ mod cassandra_protocol_tests { 65727265645f6970001000047261636b000d000f72656c656173655f76657273696f6e000d000b7270635f616464726 573730010000e736368656d615f76657273696f6e000c0006746f6b656e730022000d00000000" ); - let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( - Version::V4, - Flags::default(), - 2, - vec![], - CassandraOperation::Result(CassandraResult::Rows { + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V4, + flags: Flags::default(), + operation: CassandraOperation::Result(CassandraResult::Rows { rows: vec![], metadata: Box::new(RowsMetadata { flags: RowsMetadataFlags::GLOBAL_TABLE_SPACE, @@ -451,8 +449,10 @@ mod cassandra_protocol_tests { ], }), }), - None, - )))]; + stream_id: 2, + tracing_id: None, + warnings: vec![], + }))]; test_frame_codec_roundtrip(&mut codec, &bytes, messages); } @@ -464,19 +464,19 @@ mod cassandra_protocol_tests { 74656d2e6c6f63616c205748455245206b6579203d20276c6f63616c27000100" ); - let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( - Version::V4, - Flags::default(), - 3, - vec![], - CassandraOperation::Query { + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V4, + flags: Flags::default(), + stream_id: 3, + tracing_id: None, + warnings: vec![], + operation: CassandraOperation::Query { query: Box::new(parse_statement_single( "SELECT * FROM system.local WHERE key = 'local'", )), params: Box::new(QueryParams::default()), }, - None, - )))]; + }))]; test_frame_codec_roundtrip(&mut codec, &bytes, messages); } @@ -488,19 +488,19 @@ mod cassandra_protocol_tests { 6d2e666f6f2028626172292056414c554553202827626172322729000100" ); - let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( - Version::V4, - Flags::default(), - 3, - vec![], - CassandraOperation::Query { + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V4, + flags: Flags::default(), + stream_id: 3, + tracing_id: None, + warnings: vec![], + operation: CassandraOperation::Query { query: Box::new(parse_statement_single( "INSERT INTO system.foo (bar) VALUES ('bar2')", )), params: Box::new(QueryParams::default()), }, - None, - )))]; + }))]; test_frame_codec_roundtrip(&mut codec, &bytes, messages); } } diff --git a/shotover-proxy/src/frame/cassandra.rs b/shotover-proxy/src/frame/cassandra.rs index 7efb6e028..7184501ad 100644 --- a/shotover-proxy/src/frame/cassandra.rs +++ b/shotover-proxy/src/frame/cassandra.rs @@ -98,64 +98,25 @@ pub struct CassandraMetadata { // missing `warnings` field because we are not using it currently } -#[derive(PartialEq, Debug, Clone, Copy)] -pub enum Tracing { - Request(bool), - Response(Option), -} - -#[allow(clippy::from_over_into)] -impl Into> for Tracing { - fn into(self) -> Option { - match self { - Self::Request(_) => None, - Self::Response(uuid) => uuid, - } - } -} - #[derive(PartialEq, Debug, Clone)] pub struct CassandraFrame { pub version: Version, pub flags: Flags, pub stream_id: StreamId, - pub tracing: Tracing, + pub tracing_id: Option, pub warnings: Vec, /// Contains the message body pub operation: CassandraOperation, } impl CassandraFrame { - pub fn new( - version: Version, - flags: Flags, - stream_id: StreamId, - warnings: Vec, - operation: CassandraOperation, - tracing_id: Option, - ) -> Self { - let tracing = match operation.to_direction() { - Direction::Request => Tracing::Request(flags.contains(Flags::TRACING)), - Direction::Response => Tracing::Response(tracing_id), - }; - - Self { - version, - flags, - stream_id, - warnings, - operation, - tracing, - } - } - /// Return `CassandraMetadata` from this `CassandraFrame` pub(crate) fn metadata(&self) -> CassandraMetadata { CassandraMetadata { version: self.version, flags: self.flags, stream_id: self.stream_id, - tracing_id: self.tracing.into(), + tracing_id: self.tracing_id, opcode: self.operation.to_opcode(), } } @@ -333,16 +294,11 @@ impl CassandraFrame { Opcode::AuthSuccess => CassandraOperation::AuthSuccess(frame.body), }; - let tracing = match operation.to_direction() { - Direction::Request => Tracing::Request(frame.flags.contains(Flags::TRACING)), - Direction::Response => Tracing::Response(frame.tracing_id), - }; - Ok(CassandraFrame { version: frame.version, flags: frame.flags, stream_id: frame.stream_id, - tracing, + tracing_id: frame.tracing_id, warnings: frame.warnings, operation, }) @@ -364,7 +320,7 @@ impl CassandraFrame { opcode: self.operation.to_opcode(), stream_id: self.stream_id, body: self.operation.into_body(self.version), - tracing_id: self.tracing.into(), + tracing_id: self.tracing_id, warnings: self.warnings, } } @@ -735,8 +691,7 @@ pub struct CassandraBatch { impl Display for CassandraFrame { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { write!(f, "{} stream:{}", self.version, self.stream_id)?; - let tracing_id: Option = self.tracing.into(); - if let Some(tracing_id) = tracing_id { + if let Some(tracing_id) = self.tracing_id { write!(f, " tracing_id:{}", tracing_id)?; } if !self.warnings.is_empty() { diff --git a/shotover-proxy/src/message/mod.rs b/shotover-proxy/src/message/mod.rs index bcc1db2a3..cbf96f805 100644 --- a/shotover-proxy/src/message/mod.rs +++ b/shotover-proxy/src/message/mod.rs @@ -213,17 +213,17 @@ impl Message { Frame::Redis(_) => Frame::Redis(RedisFrame::Error( "ERR Message was filtered out by shotover".into(), )), - Frame::Cassandra(frame) => Frame::Cassandra(CassandraFrame::new( - frame.version, - frame.flags, - frame.stream_id, - vec![], - CassandraOperation::Error(ErrorBody { + Frame::Cassandra(frame) => Frame::Cassandra(CassandraFrame { + version: frame.version, + flags: frame.flags, + stream_id: frame.stream_id, + operation: CassandraOperation::Error(ErrorBody { message: "Message was filtered out by shotover".into(), ty: ErrorType::Server, }), - frame.tracing.into(), - )), + tracing_id: frame.tracing_id, + warnings: vec![], + }), Frame::None => Frame::None, }) } @@ -243,17 +243,17 @@ impl Message { Metadata::Redis => { Frame::Redis(RedisFrame::Error(Str::from_inner(error.into()).unwrap())) } - Metadata::Cassandra(frame) => Frame::Cassandra(CassandraFrame::new( - frame.version, - frame.flags, - frame.stream_id, - vec![], - CassandraOperation::Error(ErrorBody { + Metadata::Cassandra(frame) => Frame::Cassandra(CassandraFrame { + version: frame.version, + flags: frame.flags, + stream_id: frame.stream_id, + operation: CassandraOperation::Error(ErrorBody { message: error, ty: ErrorType::Server, }), - frame.tracing_id, - )), + tracing_id: frame.tracing_id, + warnings: vec![], + }), Metadata::None => Frame::None, }); self.invalidate_cache(); @@ -291,14 +291,14 @@ impl Message { ty: ErrorType::Overloaded, }); - Frame::Cassandra(CassandraFrame::new( - metadata.version, - metadata.flags, - metadata.stream_id, - vec![], - body, - metadata.tracing_id, - )) + Frame::Cassandra(CassandraFrame { + version: metadata.version, + flags: metadata.flags, + stream_id: metadata.stream_id, + tracing_id: metadata.tracing_id, + warnings: vec![], + operation: body, + }) } Metadata::Redis => { unimplemented!() diff --git a/shotover-proxy/src/transforms/cassandra/peers_rewrite.rs b/shotover-proxy/src/transforms/cassandra/peers_rewrite.rs index 1b471c50b..9cc0a2761 100644 --- a/shotover-proxy/src/transforms/cassandra/peers_rewrite.rs +++ b/shotover-proxy/src/transforms/cassandra/peers_rewrite.rs @@ -146,12 +146,13 @@ mod test { use cassandra_protocol::query::QueryParams; fn create_query_message(query: &str) -> Message { - Message::from_frame(Frame::Cassandra(CassandraFrame::new( - Version::V4, - Flags::default(), - 0, - vec![], - CassandraOperation::Query { + Message::from_frame(Frame::Cassandra(CassandraFrame { + flags: Flags::default(), + version: Version::V4, + stream_id: 0, + tracing_id: None, + warnings: vec![], + operation: CassandraOperation::Query { query: Box::new(parse_statement_single(query)), params: Box::new(QueryParams { keyspace: None, @@ -165,17 +166,17 @@ mod test { timestamp: Some(1643855761086585), }), }, - None, - ))) + })) } fn create_response_message(col_specs: &[ColSpec], rows: Vec>) -> Message { - Message::from_frame(Frame::Cassandra(CassandraFrame::new( - Version::V4, - Flags::default(), - 0, - vec![], - CassandraOperation::Result(Rows { + Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V4, + flags: Flags::default(), + stream_id: 0, + tracing_id: None, + warnings: vec![], + operation: CassandraOperation::Result(Rows { rows, metadata: Box::new(RowsMetadata { flags: RowsMetadataFlags::GLOBAL_TABLE_SPACE, @@ -189,8 +190,7 @@ mod test { col_specs: col_specs.to_owned(), }), }), - None, - ))) + })) } #[test] diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs index ea80e47f9..0479e22e5 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs @@ -1,5 +1,5 @@ use crate::error::ChainResponse; -use crate::frame::cassandra::{parse_statement_single, CassandraMetadata, Tracing}; +use crate::frame::cassandra::{parse_statement_single, CassandraMetadata}; use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame}; use crate::message::{IntSize, Message, MessageValue, Messages}; use crate::tls::{TlsConnector, TlsConnectorConfig}; @@ -194,17 +194,17 @@ impl CassandraSinkCluster { fn create_query(messages: &Messages, query: &str, version: Version) -> Result { let stream_id = get_unused_stream_id(messages)?; - Ok(Message::from_frame(Frame::Cassandra(CassandraFrame::new( + Ok(Message::from_frame(Frame::Cassandra(CassandraFrame { version, - Flags::default(), + flags: Flags::default(), stream_id, - vec![], - CassandraOperation::Query { + tracing_id: None, + warnings: vec![], + operation: CassandraOperation::Query { query: Box::new(parse_statement_single(query)), params: Box::new(QueryParams::default()), }, - None, - )))) + }))) } impl CassandraSinkCluster { @@ -366,19 +366,19 @@ impl CassandraSinkCluster { .send(Response { original: message.clone(), response: Ok(Message::from_frame(Frame::Cassandra( - CassandraFrame::new( - metadata.version, - metadata.flags.difference(Flags::TRACING), // we don't have a tracing id because we didn't actually hit a node - metadata.stream_id, - vec![], - CassandraOperation::Error(ErrorBody { + CassandraFrame { + operation: CassandraOperation::Error(ErrorBody { message: "Shotover does not have this query's metadata. Please re-prepare on this Shotover host before sending again.".into(), ty: ErrorType::Unprepared(UnpreparedError { id, }), }), - None, - ), + flags: metadata.flags.difference(Flags::TRACING), // we don't have a tracing id because we didn't actually hit a node + stream_id: metadata.stream_id, + tracing_id: None, + version: metadata.version, + warnings: vec![], + }, ))), }).expect("the receiver is guaranteed to be alive, so this must succeed"); } @@ -888,7 +888,7 @@ fn get_execute_message(message: &mut Message) -> Option<(&BodyReqExecuteOwned, C version, flags, stream_id, - tracing, + tracing_id, .. })) = message.frame() { @@ -898,7 +898,7 @@ fn get_execute_message(message: &mut Message) -> Option<(&BodyReqExecuteOwned, C version: *version, flags: *flags, stream_id: *stream_id, - tracing_id: >>::into(*tracing), + tracing_id: *tracing_id, opcode: Opcode::Execute, }, )); diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs index b6cee4fb6..ca1ed4944 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs @@ -144,19 +144,19 @@ async fn register_for_topology_and_status_events( let (tx, rx) = oneshot::channel(); connection .send( - Message::from_frame(Frame::Cassandra(CassandraFrame::new( + Message::from_frame(Frame::Cassandra(CassandraFrame { version, - Flags::default(), - 0, - vec![], - CassandraOperation::Register(BodyReqRegister { + stream_id: 0, + flags: Flags::default(), + tracing_id: None, + warnings: vec![], + operation: CassandraOperation::Register(BodyReqRegister { events: vec![ SimpleServerEvent::TopologyChange, SimpleServerEvent::StatusChange, ], }), - None, - ))), + })), tx, ) .unwrap(); @@ -197,19 +197,19 @@ mod system_local { ) -> Result> { let (tx, rx) = oneshot::channel(); connection.send( - Message::from_frame(Frame::Cassandra(CassandraFrame::new( - Version::V4, - Flags::default(), - 1, - vec![], - CassandraOperation::Query { + Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V4, + flags: Flags::default(), + stream_id: 1, + tracing_id: None, + warnings: vec![], + operation: CassandraOperation::Query { query: Box::new(parse_statement_single( "SELECT rack, tokens, host_id, data_center FROM system.local", )), params: Box::new(QueryParams::default()), }, - None, - ))), + })), tx, )?; @@ -284,19 +284,19 @@ mod system_peers { ) -> Result> { let (tx, rx) = oneshot::channel(); connection.send( - Message::from_frame(Frame::Cassandra(CassandraFrame::new( - Version::V4, - Flags::default(), - 0, - vec![], - CassandraOperation::Query { + Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V4, + stream_id: 0, + flags: Flags::default(), + tracing_id: None, + warnings: vec![], + operation: CassandraOperation::Query { query: Box::new(parse_statement_single( "SELECT native_port, native_address, rack, tokens, host_id, data_center FROM system.peers_v2", )), params: Box::new(QueryParams::default()), }, - None - ))), + })), tx, )?; @@ -305,19 +305,19 @@ mod system_peers { if is_peers_v2_does_not_exist_error(&mut response) { let (tx, rx) = oneshot::channel(); connection.send( - Message::from_frame(Frame::Cassandra(CassandraFrame::new( - Version::V4, - Flags::default(), - 0, - vec![], - CassandraOperation::Query { + Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V4, + stream_id: 0, + flags: Flags::default(), + tracing_id: None, + warnings: vec![], + operation: CassandraOperation::Query { query: Box::new(parse_statement_single( "SELECT peer, rack, tokens, host_id, data_center FROM system.peers", )), params: Box::new(QueryParams::default()), }, - None, - ))), + })), tx, )?; response = rx.await?.response?; diff --git a/shotover-proxy/tests/cassandra_int_tests/cluster.rs b/shotover-proxy/tests/cassandra_int_tests/cluster.rs index 0c453bfa2..65d478779 100644 --- a/shotover-proxy/tests/cassandra_int_tests/cluster.rs +++ b/shotover-proxy/tests/cassandra_int_tests/cluster.rs @@ -45,21 +45,23 @@ pub async fn run_topology_task(ca_path: Option<&str>, port: Option) -> Vec< fn create_handshake() -> Vec { vec![ - Message::from_frame(Frame::Cassandra(CassandraFrame::new( - Version::V4, - Flags::default(), - 64, - vec![], - CassandraOperation::Startup(b"\0\x01\0\x0bCQL_VERSION\0\x053.0.0".to_vec()), - None, - ))), - Message::from_frame(Frame::Cassandra(CassandraFrame::new( - Version::V4, - Flags::default(), - 128, - vec![], - CassandraOperation::AuthResponse(b"\0\0\0\x14\0cassandra\0cassandra".to_vec()), - None, - ))), + Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V4, + flags: Flags::default(), + stream_id: 64, + tracing_id: None, + warnings: vec![], + operation: CassandraOperation::Startup(b"\0\x01\0\x0bCQL_VERSION\0\x053.0.0".to_vec()), + })), + Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V4, + flags: Flags::default(), + stream_id: 128, + tracing_id: None, + warnings: vec![], + operation: CassandraOperation::AuthResponse( + b"\0\0\0\x14\0cassandra\0cassandra".to_vec(), + ), + })), ] }