From 6373b8d6fa785542dedba91229fadb269b9ed100 Mon Sep 17 00:00:00 2001 From: Conor Brosnan Date: Tue, 24 Jan 2023 17:44:39 +1000 Subject: [PATCH 1/3] startup body support --- shotover-proxy/src/codec/cassandra.rs | 8 ++++--- shotover-proxy/src/frame/cassandra.rs | 22 +++++++++++++++---- .../tests/cassandra_int_tests/cluster/mod.rs | 6 ++++- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/shotover-proxy/src/codec/cassandra.rs b/shotover-proxy/src/codec/cassandra.rs index b6b51b51e..67aa56f54 100644 --- a/shotover-proxy/src/codec/cassandra.rs +++ b/shotover-proxy/src/codec/cassandra.rs @@ -235,8 +235,10 @@ mod cassandra_protocol_tests { ColSpec, ColType, ColTypeOption, ColTypeOptionValue, RowsMetadata, RowsMetadataFlags, TableSpec, }; + use cassandra_protocol::frame::message_startup::BodyReqStartup; use cassandra_protocol::frame::Version; use hex_literal::hex; + use std::collections::HashMap; use tokio_util::codec::{Decoder, Encoder}; fn test_frame_codec_roundtrip( @@ -277,12 +279,12 @@ mod cassandra_protocol_tests { #[test] fn test_codec_startup() { let mut codec = CassandraCodec::new(); + let mut startup_body: HashMap = HashMap::new(); + startup_body.insert("CQL_VERSION".into(), "3.0.0".into()); let bytes = hex!("0400000001000000160001000b43514c5f56455253494f4e0005332e302e30"); let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { version: Version::V4, - operation: CassandraOperation::Startup(vec![ - 0, 1, 0, 11, 67, 81, 76, 95, 86, 69, 82, 83, 73, 79, 78, 0, 5, 51, 46, 48, 46, 48, - ]), + operation: CassandraOperation::Startup(BodyReqStartup { map: startup_body }), stream_id: 0, tracing: Tracing::Request(false), warnings: vec![], diff --git a/shotover-proxy/src/frame/cassandra.rs b/shotover-proxy/src/frame/cassandra.rs index 7ceee7e64..ec3c3f507 100644 --- a/shotover-proxy/src/frame/cassandra.rs +++ b/shotover-proxy/src/frame/cassandra.rs @@ -19,6 +19,8 @@ use cassandra_protocol::frame::message_result::{ BodyResResultPrepared, BodyResResultSetKeyspace, ColSpec, ColTypeOption, ResResultBody, ResultKind, RowsMetadata, RowsMetadataFlags, }; +use cassandra_protocol::frame::message_startup::BodyReqStartup; +use cassandra_protocol::frame::message_supported::BodyResSupported; use cassandra_protocol::frame::{ Direction, Envelope as RawCassandraFrame, Flags, Opcode, Serialize, StreamId, Version, }; @@ -276,11 +278,23 @@ impl CassandraFrame { unreachable!("We already know the operation is an error") } } - Opcode::Startup => CassandraOperation::Startup(frame.body), + Opcode::Startup => { + if let RequestBody::Startup(body) = frame.request_body()? { + CassandraOperation::Startup(body) + } else { + unreachable!("We already know the operation is a startup") + } + } Opcode::Ready => CassandraOperation::Ready(frame.body), Opcode::Authenticate => CassandraOperation::Authenticate(frame.body), Opcode::Options => CassandraOperation::Options(frame.body), - Opcode::Supported => CassandraOperation::Supported(frame.body), + Opcode::Supported => { + if let ResponseBody::Supported(body) = frame.response_body()? { + CassandraOperation::Supported(body) + } else { + unreachable!("we already know this is a supported"); + } + } Opcode::Prepare => CassandraOperation::Prepare(frame.body), Opcode::Execute => { if let RequestBody::Execute(body) = frame.request_body()? { @@ -417,11 +431,11 @@ pub enum CassandraOperation { Event(ServerEvent), Batch(CassandraBatch), // operations for protocol negotiation, should be ignored by transforms - Startup(Vec), + Startup(BodyReqStartup), Ready(Vec), Authenticate(Vec), Options(Vec), - Supported(Vec), + Supported(BodyResSupported), AuthChallenge(Vec), AuthResponse(Vec), AuthSuccess(Vec), diff --git a/shotover-proxy/tests/cassandra_int_tests/cluster/mod.rs b/shotover-proxy/tests/cassandra_int_tests/cluster/mod.rs index 0f01274c6..7148a9fd2 100644 --- a/shotover-proxy/tests/cassandra_int_tests/cluster/mod.rs +++ b/shotover-proxy/tests/cassandra_int_tests/cluster/mod.rs @@ -1,3 +1,4 @@ +use cassandra_protocol::frame::message_startup::BodyReqStartup; use cassandra_protocol::frame::Version; use shotover_proxy::frame::{cassandra::Tracing, CassandraFrame, CassandraOperation, Frame}; use shotover_proxy::message::Message; @@ -57,13 +58,16 @@ pub async fn run_topology_task(ca_path: Option<&str>, port: Option) -> Vec< } fn create_handshake() -> Vec { + let mut startup_body: HashMap = HashMap::new(); + startup_body.insert("CQL_VERSION".into(), "3.0.0".into()); + vec![ Message::from_frame(Frame::Cassandra(CassandraFrame { version: Version::V4, stream_id: 64, tracing: Tracing::Request(false), warnings: vec![], - operation: CassandraOperation::Startup(b"\0\x01\0\x0bCQL_VERSION\0\x053.0.0".to_vec()), + operation: CassandraOperation::Startup(BodyReqStartup { map: startup_body }), })), Message::from_frame(Frame::Cassandra(CassandraFrame { version: Version::V4, From d09051872fa1f37638bd5e86b456ec2a29288928 Mon Sep 17 00:00:00 2001 From: Conor Brosnan Date: Tue, 24 Jan 2023 19:16:34 +1000 Subject: [PATCH 2/3] Reject STARTUP messages if they attempt to use compression. --- shotover-proxy/src/codec/cassandra.rs | 29 +++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/shotover-proxy/src/codec/cassandra.rs b/shotover-proxy/src/codec/cassandra.rs index 67aa56f54..6c9042472 100644 --- a/shotover-proxy/src/codec/cassandra.rs +++ b/shotover-proxy/src/codec/cassandra.rs @@ -73,6 +73,15 @@ impl Decoder for CassandraCodec { let mut message = Message::from_bytes(bytes.freeze(), MessageType::Cassandra); + // if is startup message, reject compression because shotover does not support + if let Some(Frame::Cassandra(frame)) = message.frame() { + if let CassandraOperation::Startup(startup) = &mut frame.operation { + if let Some(compression) = startup.map.get("COMPRESSION") { + return Err(reject_compression(compression)); + } + } + } + if let Ok(Metadata::Cassandra(CassandraMetadata { opcode: Opcode::Query | Opcode::Batch, .. @@ -196,6 +205,26 @@ fn reject_protocol_version(version: u8) -> CodecReadError { ))]) } +fn reject_compression(compression: &String) -> CodecReadError { + info!( + "Rejecting compression option {} (configure the client to use no compression)", + compression + ); + + CodecReadError::RespondAndThenCloseConnection(vec![Message::from_frame(Frame::Cassandra( + CassandraFrame { + version: Version::V4, + stream_id: 0, + operation: CassandraOperation::Error(ErrorBody { + message: format!("Unsupported compression type {}", compression), + ty: ErrorType::Protocol, + }), + tracing: Tracing::Response(None), + warnings: vec![], + }, + ))]) +} + impl Encoder for CassandraCodec { type Error = anyhow::Error; From 5d5d7d3de21ea700549765b007ab590099e6ea8c Mon Sep 17 00:00:00 2001 From: Conor Brosnan Date: Mon, 30 Jan 2023 13:11:33 +1000 Subject: [PATCH 3/3] review feedback --- shotover-proxy/src/codec/cassandra.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/shotover-proxy/src/codec/cassandra.rs b/shotover-proxy/src/codec/cassandra.rs index 6c9042472..e36e2f279 100644 --- a/shotover-proxy/src/codec/cassandra.rs +++ b/shotover-proxy/src/codec/cassandra.rs @@ -77,7 +77,7 @@ impl Decoder for CassandraCodec { if let Some(Frame::Cassandra(frame)) = message.frame() { if let CassandraOperation::Startup(startup) = &mut frame.operation { if let Some(compression) = startup.map.get("COMPRESSION") { - return Err(reject_compression(compression)); + return Err(reject_compression(frame.stream_id, compression)); } } } @@ -205,7 +205,7 @@ fn reject_protocol_version(version: u8) -> CodecReadError { ))]) } -fn reject_compression(compression: &String) -> CodecReadError { +fn reject_compression(stream_id: i16, compression: &String) -> CodecReadError { info!( "Rejecting compression option {} (configure the client to use no compression)", compression @@ -214,7 +214,7 @@ fn reject_compression(compression: &String) -> CodecReadError { CodecReadError::RespondAndThenCloseConnection(vec![Message::from_frame(Frame::Cassandra( CassandraFrame { version: Version::V4, - stream_id: 0, + stream_id, operation: CassandraOperation::Error(ErrorBody { message: format!("Unsupported compression type {}", compression), ty: ErrorType::Protocol,