diff --git a/shotover-proxy/benches/benches/chain.rs b/shotover-proxy/benches/benches/chain.rs index ad916112e..6684eef65 100644 --- a/shotover-proxy/benches/benches/chain.rs +++ b/shotover-proxy/benches/benches/chain.rs @@ -231,13 +231,12 @@ fn criterion_benchmark(c: &mut Criterion) { let wrapper = Wrapper::new_with_chain_name( vec![Message::from_bytes( - CassandraFrame { - version: Version::V4, - flags: Flags::default(), - stream_id: 0, - tracing_id: None, - warnings: vec![], - operation: CassandraOperation::Query { + CassandraFrame::new( + Version::V4, + Flags::default(), + 0, + vec![], + CassandraOperation::Query { query: Box::new(parse_statement_single( "INSERT INTO foo (z, v) VALUES (1, 123)", )), @@ -253,7 +252,8 @@ fn criterion_benchmark(c: &mut Criterion) { now_in_seconds: None, }), }, - } + None, + ) .encode() .encode_with(Compression::None) .unwrap() @@ -339,13 +339,12 @@ fn criterion_benchmark(c: &mut Criterion) { fn cassandra_parsed_query(query: &str) -> Wrapper { Wrapper::new_with_chain_name( - vec![Message::from_frame(Frame::Cassandra(CassandraFrame { - version: Version::V4, - flags: Flags::default(), - stream_id: 0, - tracing_id: None, - warnings: vec![], - operation: CassandraOperation::Query { + vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( + Version::V4, + Flags::default(), + 0, + vec![], + CassandraOperation::Query { query: Box::new(parse_statement_single(query)), params: Box::new(QueryParams { consistency: Consistency::One, @@ -359,7 +358,8 @@ fn cassandra_parsed_query(query: &str) -> Wrapper { now_in_seconds: None, }), }, - }))], + None, + )))], "bench".into(), "127.0.0.1:6379".parse().unwrap(), ) diff --git a/shotover-proxy/benches/benches/codec.rs b/shotover-proxy/benches/benches/codec.rs index e5a412997..a212bfa6f 100644 --- a/shotover-proxy/benches/benches/codec.rs +++ b/shotover-proxy/benches/benches/codec.rs @@ -18,17 +18,17 @@ fn criterion_benchmark(c: &mut Criterion) { group.noise_threshold(0.2); { - let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { - version: Version::V4, - flags: Flags::default(), - stream_id: 1, - tracing_id: None, - warnings: vec![], - operation: CassandraOperation::Query { + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( + Version::V4, + Flags::default(), + 1, + vec![], + CassandraOperation::Query { query: Box::new(parse_statement_single("SELECT * FROM system.local;")), params: Box::new(QueryParams::default()), }, - }))]; + None, + )))]; let mut codec = CassandraCodec::new(); @@ -46,14 +46,14 @@ fn criterion_benchmark(c: &mut Criterion) { } { - let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { - version: Version::V4, - flags: Flags::default(), - stream_id: 0, - tracing_id: None, - warnings: vec![], - operation: CassandraOperation::Result(peers_v2_result()), - }))]; + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( + Version::V4, + Flags::default(), + 0, + vec![], + CassandraOperation::Result(peers_v2_result()), + None, + )))]; let mut codec = CassandraCodec::new(); diff --git a/shotover-proxy/src/codec/cassandra.rs b/shotover-proxy/src/codec/cassandra.rs index 75a9a6035..e830729b7 100644 --- a/shotover-proxy/src/codec/cassandra.rs +++ b/shotover-proxy/src/codec/cassandra.rs @@ -183,17 +183,17 @@ fn reject_protocol_version(version: u8) -> CodecReadError { ); CodecReadError::RespondAndThenCloseConnection(vec![Message::from_frame(Frame::Cassandra( - CassandraFrame { - version: Version::V4, - flags: Flags::default(), - stream_id: 0, - operation: CassandraOperation::Error(ErrorBody { + CassandraFrame::new( + Version::V4, + Flags::default(), + 0, + vec![], + CassandraOperation::Error(ErrorBody { message: "Invalid or unsupported protocol version".into(), ty: ErrorType::Protocol, }), - tracing_id: None, - warnings: vec![], - }, + None, + ), ))]) } @@ -280,16 +280,16 @@ mod cassandra_protocol_tests { fn test_codec_startup() { let mut codec = CassandraCodec::new(); let bytes = hex!("0400000001000000160001000b43514c5f56455253494f4e0005332e302e30"); - let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { - version: Version::V4, - flags: Flags::default(), - operation: CassandraOperation::Startup(vec![ + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( + Version::V4, + Flags::default(), + 0, + vec![], + CassandraOperation::Startup(vec![ 0, 1, 0, 11, 67, 81, 76, 95, 86, 69, 82, 83, 73, 79, 78, 0, 5, 51, 46, 48, 46, 48, ]), - stream_id: 0, - tracing_id: None, - warnings: vec![], - }))]; + None, + )))]; test_frame_codec_roundtrip(&mut codec, &bytes, messages); } @@ -297,14 +297,14 @@ mod cassandra_protocol_tests { fn test_codec_options() { let mut codec = CassandraCodec::new(); let bytes = hex!("040000000500000000"); - let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { - version: Version::V4, - flags: Flags::default(), - operation: CassandraOperation::Options(vec![]), - stream_id: 0, - tracing_id: None, - warnings: vec![], - }))]; + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( + Version::V4, + Flags::default(), + 0, + vec![], + CassandraOperation::Options(vec![]), + None, + )))]; test_frame_codec_roundtrip(&mut codec, &bytes, messages); } @@ -312,14 +312,14 @@ mod cassandra_protocol_tests { fn test_codec_ready() { let mut codec = CassandraCodec::new(); let bytes = hex!("840000000200000000"); - let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { - version: Version::V4, - flags: Flags::default(), - operation: CassandraOperation::Ready(vec![]), - stream_id: 0, - tracing_id: None, - warnings: vec![], - }))]; + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( + Version::V4, + Flags::default(), + 0, + vec![], + CassandraOperation::Ready(vec![]), + None, + )))]; test_frame_codec_roundtrip(&mut codec, &bytes, messages); } @@ -330,20 +330,20 @@ mod cassandra_protocol_tests { "040000010b000000310003000f544f504f4c4f47595f4348414e4745 000d5354415455535f4348414e4745000d534348454d415f4348414e4745" ); - let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { - version: Version::V4, - flags: Flags::default(), - operation: CassandraOperation::Register(BodyReqRegister { + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( + Version::V4, + Flags::default(), + 1, + vec![], + CassandraOperation::Register(BodyReqRegister { events: vec![ SimpleServerEvent::TopologyChange, SimpleServerEvent::StatusChange, SimpleServerEvent::SchemaChange, ], }), - stream_id: 1, - tracing_id: None, - warnings: vec![], - }))]; + None, + )))]; test_frame_codec_roundtrip(&mut codec, &bytes, messages); } @@ -356,10 +356,12 @@ mod cassandra_protocol_tests { 65727265645f6970001000047261636b000d000f72656c656173655f76657273696f6e000d000b7270635f616464726 573730010000e736368656d615f76657273696f6e000c0006746f6b656e730022000d00000000" ); - let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { - version: Version::V4, - flags: Flags::default(), - operation: CassandraOperation::Result(CassandraResult::Rows { + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( + Version::V4, + Flags::default(), + 2, + vec![], + CassandraOperation::Result(CassandraResult::Rows { rows: vec![], metadata: Box::new(RowsMetadata { flags: RowsMetadataFlags::GLOBAL_TABLE_SPACE, @@ -449,10 +451,8 @@ mod cassandra_protocol_tests { ], }), }), - stream_id: 2, - tracing_id: None, - warnings: vec![], - }))]; + None, + )))]; test_frame_codec_roundtrip(&mut codec, &bytes, messages); } @@ -464,19 +464,19 @@ mod cassandra_protocol_tests { 74656d2e6c6f63616c205748455245206b6579203d20276c6f63616c27000100" ); - let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { - version: Version::V4, - flags: Flags::default(), - stream_id: 3, - tracing_id: None, - warnings: vec![], - operation: CassandraOperation::Query { + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( + Version::V4, + Flags::default(), + 3, + vec![], + CassandraOperation::Query { query: Box::new(parse_statement_single( "SELECT * FROM system.local WHERE key = 'local'", )), params: Box::new(QueryParams::default()), }, - }))]; + None, + )))]; test_frame_codec_roundtrip(&mut codec, &bytes, messages); } @@ -488,19 +488,19 @@ mod cassandra_protocol_tests { 6d2e666f6f2028626172292056414c554553202827626172322729000100" ); - let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { - version: Version::V4, - flags: Flags::default(), - stream_id: 3, - tracing_id: None, - warnings: vec![], - operation: CassandraOperation::Query { + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame::new( + Version::V4, + Flags::default(), + 3, + vec![], + CassandraOperation::Query { query: Box::new(parse_statement_single( "INSERT INTO system.foo (bar) VALUES ('bar2')", )), params: Box::new(QueryParams::default()), }, - }))]; + None, + )))]; test_frame_codec_roundtrip(&mut codec, &bytes, messages); } } diff --git a/shotover-proxy/src/frame/cassandra.rs b/shotover-proxy/src/frame/cassandra.rs index 7184501ad..7efb6e028 100644 --- a/shotover-proxy/src/frame/cassandra.rs +++ b/shotover-proxy/src/frame/cassandra.rs @@ -98,25 +98,64 @@ pub struct CassandraMetadata { // missing `warnings` field because we are not using it currently } +#[derive(PartialEq, Debug, Clone, Copy)] +pub enum Tracing { + Request(bool), + Response(Option), +} + +#[allow(clippy::from_over_into)] +impl Into> for Tracing { + fn into(self) -> Option { + match self { + Self::Request(_) => None, + Self::Response(uuid) => uuid, + } + } +} + #[derive(PartialEq, Debug, Clone)] pub struct CassandraFrame { pub version: Version, pub flags: Flags, pub stream_id: StreamId, - pub tracing_id: Option, + pub tracing: Tracing, pub warnings: Vec, /// Contains the message body pub operation: CassandraOperation, } impl CassandraFrame { + pub fn new( + version: Version, + flags: Flags, + stream_id: StreamId, + warnings: Vec, + operation: CassandraOperation, + tracing_id: Option, + ) -> Self { + let tracing = match operation.to_direction() { + Direction::Request => Tracing::Request(flags.contains(Flags::TRACING)), + Direction::Response => Tracing::Response(tracing_id), + }; + + Self { + version, + flags, + stream_id, + warnings, + operation, + tracing, + } + } + /// Return `CassandraMetadata` from this `CassandraFrame` pub(crate) fn metadata(&self) -> CassandraMetadata { CassandraMetadata { version: self.version, flags: self.flags, stream_id: self.stream_id, - tracing_id: self.tracing_id, + tracing_id: self.tracing.into(), opcode: self.operation.to_opcode(), } } @@ -294,11 +333,16 @@ impl CassandraFrame { Opcode::AuthSuccess => CassandraOperation::AuthSuccess(frame.body), }; + let tracing = match operation.to_direction() { + Direction::Request => Tracing::Request(frame.flags.contains(Flags::TRACING)), + Direction::Response => Tracing::Response(frame.tracing_id), + }; + Ok(CassandraFrame { version: frame.version, flags: frame.flags, stream_id: frame.stream_id, - tracing_id: frame.tracing_id, + tracing, warnings: frame.warnings, operation, }) @@ -320,7 +364,7 @@ impl CassandraFrame { opcode: self.operation.to_opcode(), stream_id: self.stream_id, body: self.operation.into_body(self.version), - tracing_id: self.tracing_id, + tracing_id: self.tracing.into(), warnings: self.warnings, } } @@ -691,7 +735,8 @@ pub struct CassandraBatch { impl Display for CassandraFrame { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { write!(f, "{} stream:{}", self.version, self.stream_id)?; - if let Some(tracing_id) = self.tracing_id { + let tracing_id: Option = self.tracing.into(); + if let Some(tracing_id) = tracing_id { write!(f, " tracing_id:{}", tracing_id)?; } if !self.warnings.is_empty() { diff --git a/shotover-proxy/src/message/mod.rs b/shotover-proxy/src/message/mod.rs index cbf96f805..bcc1db2a3 100644 --- a/shotover-proxy/src/message/mod.rs +++ b/shotover-proxy/src/message/mod.rs @@ -213,17 +213,17 @@ impl Message { Frame::Redis(_) => Frame::Redis(RedisFrame::Error( "ERR Message was filtered out by shotover".into(), )), - Frame::Cassandra(frame) => Frame::Cassandra(CassandraFrame { - version: frame.version, - flags: frame.flags, - stream_id: frame.stream_id, - operation: CassandraOperation::Error(ErrorBody { + Frame::Cassandra(frame) => Frame::Cassandra(CassandraFrame::new( + frame.version, + frame.flags, + frame.stream_id, + vec![], + CassandraOperation::Error(ErrorBody { message: "Message was filtered out by shotover".into(), ty: ErrorType::Server, }), - tracing_id: frame.tracing_id, - warnings: vec![], - }), + frame.tracing.into(), + )), Frame::None => Frame::None, }) } @@ -243,17 +243,17 @@ impl Message { Metadata::Redis => { Frame::Redis(RedisFrame::Error(Str::from_inner(error.into()).unwrap())) } - Metadata::Cassandra(frame) => Frame::Cassandra(CassandraFrame { - version: frame.version, - flags: frame.flags, - stream_id: frame.stream_id, - operation: CassandraOperation::Error(ErrorBody { + Metadata::Cassandra(frame) => Frame::Cassandra(CassandraFrame::new( + frame.version, + frame.flags, + frame.stream_id, + vec![], + CassandraOperation::Error(ErrorBody { message: error, ty: ErrorType::Server, }), - tracing_id: frame.tracing_id, - warnings: vec![], - }), + frame.tracing_id, + )), Metadata::None => Frame::None, }); self.invalidate_cache(); @@ -291,14 +291,14 @@ impl Message { ty: ErrorType::Overloaded, }); - Frame::Cassandra(CassandraFrame { - version: metadata.version, - flags: metadata.flags, - stream_id: metadata.stream_id, - tracing_id: metadata.tracing_id, - warnings: vec![], - operation: body, - }) + Frame::Cassandra(CassandraFrame::new( + metadata.version, + metadata.flags, + metadata.stream_id, + vec![], + body, + metadata.tracing_id, + )) } Metadata::Redis => { unimplemented!() diff --git a/shotover-proxy/src/transforms/cassandra/peers_rewrite.rs b/shotover-proxy/src/transforms/cassandra/peers_rewrite.rs index 9cc0a2761..1b471c50b 100644 --- a/shotover-proxy/src/transforms/cassandra/peers_rewrite.rs +++ b/shotover-proxy/src/transforms/cassandra/peers_rewrite.rs @@ -146,13 +146,12 @@ mod test { use cassandra_protocol::query::QueryParams; fn create_query_message(query: &str) -> Message { - Message::from_frame(Frame::Cassandra(CassandraFrame { - flags: Flags::default(), - version: Version::V4, - stream_id: 0, - tracing_id: None, - warnings: vec![], - operation: CassandraOperation::Query { + Message::from_frame(Frame::Cassandra(CassandraFrame::new( + Version::V4, + Flags::default(), + 0, + vec![], + CassandraOperation::Query { query: Box::new(parse_statement_single(query)), params: Box::new(QueryParams { keyspace: None, @@ -166,17 +165,17 @@ mod test { timestamp: Some(1643855761086585), }), }, - })) + None, + ))) } fn create_response_message(col_specs: &[ColSpec], rows: Vec>) -> Message { - Message::from_frame(Frame::Cassandra(CassandraFrame { - version: Version::V4, - flags: Flags::default(), - stream_id: 0, - tracing_id: None, - warnings: vec![], - operation: CassandraOperation::Result(Rows { + Message::from_frame(Frame::Cassandra(CassandraFrame::new( + Version::V4, + Flags::default(), + 0, + vec![], + CassandraOperation::Result(Rows { rows, metadata: Box::new(RowsMetadata { flags: RowsMetadataFlags::GLOBAL_TABLE_SPACE, @@ -190,7 +189,8 @@ mod test { col_specs: col_specs.to_owned(), }), }), - })) + None, + ))) } #[test] diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs index ef530da82..2e0b41cc5 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs @@ -1,5 +1,5 @@ use crate::error::ChainResponse; -use crate::frame::cassandra::{parse_statement_single, CassandraMetadata}; +use crate::frame::cassandra::{parse_statement_single, CassandraMetadata, Tracing}; use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame}; use crate::message::{IntSize, Message, MessageValue, Messages}; use crate::tls::{TlsConnector, TlsConnectorConfig}; @@ -194,17 +194,17 @@ impl CassandraSinkCluster { fn create_query(messages: &Messages, query: &str, version: Version) -> Result { let stream_id = get_unused_stream_id(messages)?; - Ok(Message::from_frame(Frame::Cassandra(CassandraFrame { + Ok(Message::from_frame(Frame::Cassandra(CassandraFrame::new( version, - flags: Flags::default(), + Flags::default(), stream_id, - tracing_id: None, - warnings: vec![], - operation: CassandraOperation::Query { + vec![], + CassandraOperation::Query { query: Box::new(parse_statement_single(query)), params: Box::new(QueryParams::default()), }, - }))) + None, + )))) } impl CassandraSinkCluster { @@ -366,19 +366,19 @@ impl CassandraSinkCluster { .send(Response { original: message.clone(), response: Ok(Message::from_frame(Frame::Cassandra( - CassandraFrame { - operation: CassandraOperation::Error(ErrorBody { + CassandraFrame::new( + metadata.version, + metadata.flags.difference(Flags::TRACING), // we don't have a tracing id because we didn't actually hit a node + metadata.stream_id, + vec![], + CassandraOperation::Error(ErrorBody { message: "Shotover does not have this query's metadata. Please re-prepare on this Shotover host before sending again.".into(), ty: ErrorType::Unprepared(UnpreparedError { id, }), }), - flags: metadata.flags.difference(Flags::TRACING), // we don't have a tracing id because we didn't actually hit a node - stream_id: metadata.stream_id, - tracing_id: None, - version: metadata.version, - warnings: vec![], - }, + None, + ), ))), }).expect("the receiver is guaranteed to be alive, so this must succeed"); } @@ -879,7 +879,7 @@ fn get_execute_message(message: &mut Message) -> Option<(&BodyReqExecuteOwned, C version, flags, stream_id, - tracing_id, + tracing, .. })) = message.frame() { @@ -889,7 +889,7 @@ fn get_execute_message(message: &mut Message) -> Option<(&BodyReqExecuteOwned, C version: *version, flags: *flags, stream_id: *stream_id, - tracing_id: *tracing_id, + tracing_id: >>::into(*tracing), opcode: Opcode::Execute, }, )); diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs index ca1ed4944..b6cee4fb6 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs @@ -144,19 +144,19 @@ async fn register_for_topology_and_status_events( let (tx, rx) = oneshot::channel(); connection .send( - Message::from_frame(Frame::Cassandra(CassandraFrame { + Message::from_frame(Frame::Cassandra(CassandraFrame::new( version, - stream_id: 0, - flags: Flags::default(), - tracing_id: None, - warnings: vec![], - operation: CassandraOperation::Register(BodyReqRegister { + Flags::default(), + 0, + vec![], + CassandraOperation::Register(BodyReqRegister { events: vec![ SimpleServerEvent::TopologyChange, SimpleServerEvent::StatusChange, ], }), - })), + None, + ))), tx, ) .unwrap(); @@ -197,19 +197,19 @@ mod system_local { ) -> Result> { let (tx, rx) = oneshot::channel(); connection.send( - Message::from_frame(Frame::Cassandra(CassandraFrame { - version: Version::V4, - flags: Flags::default(), - stream_id: 1, - tracing_id: None, - warnings: vec![], - operation: CassandraOperation::Query { + Message::from_frame(Frame::Cassandra(CassandraFrame::new( + Version::V4, + Flags::default(), + 1, + vec![], + CassandraOperation::Query { query: Box::new(parse_statement_single( "SELECT rack, tokens, host_id, data_center FROM system.local", )), params: Box::new(QueryParams::default()), }, - })), + None, + ))), tx, )?; @@ -284,19 +284,19 @@ mod system_peers { ) -> Result> { let (tx, rx) = oneshot::channel(); connection.send( - Message::from_frame(Frame::Cassandra(CassandraFrame { - version: Version::V4, - stream_id: 0, - flags: Flags::default(), - tracing_id: None, - warnings: vec![], - operation: CassandraOperation::Query { + Message::from_frame(Frame::Cassandra(CassandraFrame::new( + Version::V4, + Flags::default(), + 0, + vec![], + CassandraOperation::Query { query: Box::new(parse_statement_single( "SELECT native_port, native_address, rack, tokens, host_id, data_center FROM system.peers_v2", )), params: Box::new(QueryParams::default()), }, - })), + None + ))), tx, )?; @@ -305,19 +305,19 @@ mod system_peers { if is_peers_v2_does_not_exist_error(&mut response) { let (tx, rx) = oneshot::channel(); connection.send( - Message::from_frame(Frame::Cassandra(CassandraFrame { - version: Version::V4, - stream_id: 0, - flags: Flags::default(), - tracing_id: None, - warnings: vec![], - operation: CassandraOperation::Query { + Message::from_frame(Frame::Cassandra(CassandraFrame::new( + Version::V4, + Flags::default(), + 0, + vec![], + CassandraOperation::Query { query: Box::new(parse_statement_single( "SELECT peer, rack, tokens, host_id, data_center FROM system.peers", )), params: Box::new(QueryParams::default()), }, - })), + None, + ))), tx, )?; response = rx.await?.response?; diff --git a/shotover-proxy/tests/cassandra_int_tests/cluster.rs b/shotover-proxy/tests/cassandra_int_tests/cluster.rs index 65d478779..0c453bfa2 100644 --- a/shotover-proxy/tests/cassandra_int_tests/cluster.rs +++ b/shotover-proxy/tests/cassandra_int_tests/cluster.rs @@ -45,23 +45,21 @@ pub async fn run_topology_task(ca_path: Option<&str>, port: Option) -> Vec< fn create_handshake() -> Vec { vec![ - Message::from_frame(Frame::Cassandra(CassandraFrame { - version: Version::V4, - flags: Flags::default(), - stream_id: 64, - tracing_id: None, - warnings: vec![], - operation: CassandraOperation::Startup(b"\0\x01\0\x0bCQL_VERSION\0\x053.0.0".to_vec()), - })), - Message::from_frame(Frame::Cassandra(CassandraFrame { - version: Version::V4, - flags: Flags::default(), - stream_id: 128, - tracing_id: None, - warnings: vec![], - operation: CassandraOperation::AuthResponse( - b"\0\0\0\x14\0cassandra\0cassandra".to_vec(), - ), - })), + Message::from_frame(Frame::Cassandra(CassandraFrame::new( + Version::V4, + Flags::default(), + 64, + vec![], + CassandraOperation::Startup(b"\0\x01\0\x0bCQL_VERSION\0\x053.0.0".to_vec()), + None, + ))), + Message::from_frame(Frame::Cassandra(CassandraFrame::new( + Version::V4, + Flags::default(), + 128, + vec![], + CassandraOperation::AuthResponse(b"\0\0\0\x14\0cassandra\0cassandra".to_vec()), + None, + ))), ] }