From 6373b8d6fa785542dedba91229fadb269b9ed100 Mon Sep 17 00:00:00 2001 From: Conor Brosnan Date: Tue, 24 Jan 2023 17:44:39 +1000 Subject: [PATCH 1/3] startup body support --- shotover-proxy/src/codec/cassandra.rs | 8 ++++--- shotover-proxy/src/frame/cassandra.rs | 22 +++++++++++++++---- .../tests/cassandra_int_tests/cluster/mod.rs | 6 ++++- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/shotover-proxy/src/codec/cassandra.rs b/shotover-proxy/src/codec/cassandra.rs index b6b51b51e..67aa56f54 100644 --- a/shotover-proxy/src/codec/cassandra.rs +++ b/shotover-proxy/src/codec/cassandra.rs @@ -235,8 +235,10 @@ mod cassandra_protocol_tests { ColSpec, ColType, ColTypeOption, ColTypeOptionValue, RowsMetadata, RowsMetadataFlags, TableSpec, }; + use cassandra_protocol::frame::message_startup::BodyReqStartup; use cassandra_protocol::frame::Version; use hex_literal::hex; + use std::collections::HashMap; use tokio_util::codec::{Decoder, Encoder}; fn test_frame_codec_roundtrip( @@ -277,12 +279,12 @@ mod cassandra_protocol_tests { #[test] fn test_codec_startup() { let mut codec = CassandraCodec::new(); + let mut startup_body: HashMap = HashMap::new(); + startup_body.insert("CQL_VERSION".into(), "3.0.0".into()); let bytes = hex!("0400000001000000160001000b43514c5f56455253494f4e0005332e302e30"); let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { version: Version::V4, - operation: CassandraOperation::Startup(vec![ - 0, 1, 0, 11, 67, 81, 76, 95, 86, 69, 82, 83, 73, 79, 78, 0, 5, 51, 46, 48, 46, 48, - ]), + operation: CassandraOperation::Startup(BodyReqStartup { map: startup_body }), stream_id: 0, tracing: Tracing::Request(false), warnings: vec![], diff --git a/shotover-proxy/src/frame/cassandra.rs b/shotover-proxy/src/frame/cassandra.rs index 7ceee7e64..ec3c3f507 100644 --- a/shotover-proxy/src/frame/cassandra.rs +++ b/shotover-proxy/src/frame/cassandra.rs @@ -19,6 +19,8 @@ use cassandra_protocol::frame::message_result::{ BodyResResultPrepared, BodyResResultSetKeyspace, ColSpec, ColTypeOption, ResResultBody, ResultKind, RowsMetadata, RowsMetadataFlags, }; +use cassandra_protocol::frame::message_startup::BodyReqStartup; +use cassandra_protocol::frame::message_supported::BodyResSupported; use cassandra_protocol::frame::{ Direction, Envelope as RawCassandraFrame, Flags, Opcode, Serialize, StreamId, Version, }; @@ -276,11 +278,23 @@ impl CassandraFrame { unreachable!("We already know the operation is an error") } } - Opcode::Startup => CassandraOperation::Startup(frame.body), + Opcode::Startup => { + if let RequestBody::Startup(body) = frame.request_body()? { + CassandraOperation::Startup(body) + } else { + unreachable!("We already know the operation is a startup") + } + } Opcode::Ready => CassandraOperation::Ready(frame.body), Opcode::Authenticate => CassandraOperation::Authenticate(frame.body), Opcode::Options => CassandraOperation::Options(frame.body), - Opcode::Supported => CassandraOperation::Supported(frame.body), + Opcode::Supported => { + if let ResponseBody::Supported(body) = frame.response_body()? { + CassandraOperation::Supported(body) + } else { + unreachable!("we already know this is a supported"); + } + } Opcode::Prepare => CassandraOperation::Prepare(frame.body), Opcode::Execute => { if let RequestBody::Execute(body) = frame.request_body()? { @@ -417,11 +431,11 @@ pub enum CassandraOperation { Event(ServerEvent), Batch(CassandraBatch), // operations for protocol negotiation, should be ignored by transforms - Startup(Vec), + Startup(BodyReqStartup), Ready(Vec), Authenticate(Vec), Options(Vec), - Supported(Vec), + Supported(BodyResSupported), AuthChallenge(Vec), AuthResponse(Vec), AuthSuccess(Vec), diff --git a/shotover-proxy/tests/cassandra_int_tests/cluster/mod.rs b/shotover-proxy/tests/cassandra_int_tests/cluster/mod.rs index 0f01274c6..7148a9fd2 100644 --- a/shotover-proxy/tests/cassandra_int_tests/cluster/mod.rs +++ b/shotover-proxy/tests/cassandra_int_tests/cluster/mod.rs @@ -1,3 +1,4 @@ +use cassandra_protocol::frame::message_startup::BodyReqStartup; use cassandra_protocol::frame::Version; use shotover_proxy::frame::{cassandra::Tracing, CassandraFrame, CassandraOperation, Frame}; use shotover_proxy::message::Message; @@ -57,13 +58,16 @@ pub async fn run_topology_task(ca_path: Option<&str>, port: Option) -> Vec< } fn create_handshake() -> Vec { + let mut startup_body: HashMap = HashMap::new(); + startup_body.insert("CQL_VERSION".into(), "3.0.0".into()); + vec![ Message::from_frame(Frame::Cassandra(CassandraFrame { version: Version::V4, stream_id: 64, tracing: Tracing::Request(false), warnings: vec![], - operation: CassandraOperation::Startup(b"\0\x01\0\x0bCQL_VERSION\0\x053.0.0".to_vec()), + operation: CassandraOperation::Startup(BodyReqStartup { map: startup_body }), })), Message::from_frame(Frame::Cassandra(CassandraFrame { version: Version::V4, From aac455b08484c11bef8c8e2b66d69a43fbbc172a Mon Sep 17 00:00:00 2001 From: Conor Brosnan Date: Tue, 24 Jan 2023 19:38:37 +1000 Subject: [PATCH 2/3] options rewrite transform --- .../src/transforms/cassandra/mod.rs | 1 + .../transforms/cassandra/options_rewrite.rs | 64 +++++++++++++++++++ shotover-proxy/src/transforms/mod.rs | 25 ++++++-- .../tests/cassandra_int_tests/mod.rs | 59 +++++++++++++++++ .../docker-compose.yaml | 14 ++++ .../cassandra-options-rewrite/topology.yaml | 24 +++++++ 6 files changed, 182 insertions(+), 5 deletions(-) create mode 100644 shotover-proxy/src/transforms/cassandra/options_rewrite.rs create mode 100644 shotover-proxy/tests/test-configs/cassandra-options-rewrite/docker-compose.yaml create mode 100644 shotover-proxy/tests/test-configs/cassandra-options-rewrite/topology.yaml diff --git a/shotover-proxy/src/transforms/cassandra/mod.rs b/shotover-proxy/src/transforms/cassandra/mod.rs index 5c935ce91..037c6b452 100644 --- a/shotover-proxy/src/transforms/cassandra/mod.rs +++ b/shotover-proxy/src/transforms/cassandra/mod.rs @@ -1,4 +1,5 @@ mod connection; +pub mod options_rewrite; pub mod peers_rewrite; pub mod sink_cluster; pub mod sink_single; diff --git a/shotover-proxy/src/transforms/cassandra/options_rewrite.rs b/shotover-proxy/src/transforms/cassandra/options_rewrite.rs new file mode 100644 index 000000000..4c417f22d --- /dev/null +++ b/shotover-proxy/src/transforms/cassandra/options_rewrite.rs @@ -0,0 +1,64 @@ +use crate::frame::{CassandraOperation, Frame}; +use crate::{ + error::ChainResponse, + transforms::{Transform, TransformBuilder, Wrapper}, +}; +use anyhow::Result; +use async_trait::async_trait; +use cassandra_protocol::frame::message_supported::BodyResSupported; +use serde::Deserialize; +use std::collections::HashMap; + +#[derive(Deserialize, Debug, Clone)] +pub struct CassandraOptionsRewriteConfig { + pub map: HashMap>, +} + +impl CassandraOptionsRewriteConfig { + pub async fn get_transform(&self) -> Result { + Ok(TransformBuilder::CassandraOptionsRewrite( + CassandraOptionsRewrite::new(self.map.clone()), + )) + } +} + +#[derive(Clone)] +pub struct CassandraOptionsRewrite { + map: HashMap>, +} + +impl CassandraOptionsRewrite { + pub fn new(map: HashMap>) -> Self { + CassandraOptionsRewrite { map } + } +} + +#[async_trait] +impl Transform for CassandraOptionsRewrite { + async fn transform<'a>(&'a mut self, message_wrapper: Wrapper<'a>) -> ChainResponse { + let mut response = message_wrapper.call_next_transform().await?; + for message in &mut response { + let mut invalidate = false; + if let Some(Frame::Cassandra(frame)) = message.frame() { + if let CassandraOperation::Supported(BodyResSupported { data }) = + &mut frame.operation + { + for key in self.map.keys() { + data.insert(key.clone(), self.map.get(key).unwrap().clone()); + } + invalidate = true; + } + } + if invalidate { + message.invalidate_cache(); + } + } + + Ok(response) + } + + async fn transform_pushed<'a>(&'a mut self, message_wrapper: Wrapper<'a>) -> ChainResponse { + let response = message_wrapper.call_next_transform_pushed().await?; + Ok(response) + } +} diff --git a/shotover-proxy/src/transforms/mod.rs b/shotover-proxy/src/transforms/mod.rs index 0640bf8a5..1b8106d2e 100644 --- a/shotover-proxy/src/transforms/mod.rs +++ b/shotover-proxy/src/transforms/mod.rs @@ -1,10 +1,14 @@ -use self::cassandra::sink_cluster::CassandraSinkClusterBuilder; use crate::error::ChainResponse; use crate::message::Messages; -use crate::transforms::cassandra::peers_rewrite::CassandraPeersRewrite; -use crate::transforms::cassandra::peers_rewrite::CassandraPeersRewriteConfig; -use crate::transforms::cassandra::sink_cluster::CassandraSinkCluster; -use crate::transforms::cassandra::sink_cluster::CassandraSinkClusterConfig; +use crate::transforms::cassandra::options_rewrite::{ + CassandraOptionsRewrite, CassandraOptionsRewriteConfig, +}; +use crate::transforms::cassandra::peers_rewrite::{ + CassandraPeersRewrite, CassandraPeersRewriteConfig, +}; +use crate::transforms::cassandra::sink_cluster::{ + CassandraSinkCluster, CassandraSinkClusterBuilder, CassandraSinkClusterConfig, +}; use crate::transforms::cassandra::sink_single::{CassandraSinkSingle, CassandraSinkSingleConfig}; use crate::transforms::chain::{TransformChain, TransformChainBuilder}; use crate::transforms::coalesce::{Coalesce, CoalesceConfig}; @@ -80,6 +84,7 @@ pub enum TransformBuilder { CassandraSinkCluster(Box), RedisSinkSingle(RedisSinkSingle), CassandraPeersRewrite(CassandraPeersRewrite), + CassandraOptionsRewrite(CassandraOptionsRewrite), RedisCache(SimpleRedisCacheBuilder), Tee(Tee), Null(Null), @@ -110,6 +115,7 @@ impl TransformBuilder { Transforms::CassandraSinkCluster(t.build()) } TransformBuilder::CassandraPeersRewrite(t) => Transforms::CassandraPeersRewrite(t), + TransformBuilder::CassandraOptionsRewrite(t) => Transforms::CassandraOptionsRewrite(t), TransformBuilder::RedisCache(t) => Transforms::RedisCache(t.build()), TransformBuilder::Tee(t) => Transforms::Tee(t), TransformBuilder::RedisSinkSingle(t) => Transforms::RedisSinkSingle(t), @@ -145,6 +151,7 @@ impl TransformBuilder { TransformBuilder::CassandraSinkSingle(c) => c.validate(), TransformBuilder::CassandraSinkCluster(c) => c.validate(), TransformBuilder::CassandraPeersRewrite(c) => c.validate(), + TransformBuilder::CassandraOptionsRewrite(c) => c.validate(), TransformBuilder::RedisCache(r) => r.validate(), TransformBuilder::Tee(t) => t.validate(), TransformBuilder::RedisSinkSingle(r) => r.validate(), @@ -174,6 +181,7 @@ impl TransformBuilder { TransformBuilder::CassandraSinkSingle(c) => c.is_terminating(), TransformBuilder::CassandraSinkCluster(c) => c.is_terminating(), TransformBuilder::CassandraPeersRewrite(c) => c.is_terminating(), + TransformBuilder::CassandraOptionsRewrite(c) => c.is_terminating(), TransformBuilder::RedisCache(r) => r.is_terminating(), TransformBuilder::Tee(t) => t.is_terminating(), TransformBuilder::RedisSinkSingle(r) => r.is_terminating(), @@ -215,6 +223,7 @@ pub enum Transforms { CassandraSinkCluster(Box), RedisSinkSingle(RedisSinkSingle), CassandraPeersRewrite(CassandraPeersRewrite), + CassandraOptionsRewrite(CassandraOptionsRewrite), RedisCache(SimpleRedisCache), Tee(Tee), Null(Null), @@ -249,6 +258,7 @@ impl Transforms { Transforms::CassandraSinkSingle(c) => c.transform(message_wrapper).await, Transforms::CassandraSinkCluster(c) => c.transform(message_wrapper).await, Transforms::CassandraPeersRewrite(c) => c.transform(message_wrapper).await, + Transforms::CassandraOptionsRewrite(c) => c.transform(message_wrapper).await, Transforms::RedisCache(r) => r.transform(message_wrapper).await, Transforms::Tee(m) => m.transform(message_wrapper).await, Transforms::DebugPrinter(p) => p.transform(message_wrapper).await, @@ -277,6 +287,7 @@ impl Transforms { match self { Transforms::CassandraSinkSingle(c) => c.transform_pushed(message_wrapper).await, Transforms::CassandraSinkCluster(c) => c.transform_pushed(message_wrapper).await, + Transforms::CassandraOptionsRewrite(c) => c.transform_pushed(message_wrapper).await, Transforms::CassandraPeersRewrite(c) => c.transform_pushed(message_wrapper).await, Transforms::RedisCache(r) => r.transform_pushed(message_wrapper).await, Transforms::Tee(m) => m.transform_pushed(message_wrapper).await, @@ -311,6 +322,7 @@ impl Transforms { Transforms::CassandraSinkSingle(a) => a.prep_transform_chain(t).await, Transforms::CassandraSinkCluster(a) => a.prep_transform_chain(t).await, Transforms::CassandraPeersRewrite(c) => c.prep_transform_chain(t).await, + Transforms::CassandraOptionsRewrite(c) => c.prep_transform_chain(t).await, Transforms::RedisSinkSingle(a) => a.prep_transform_chain(t).await, Transforms::RedisCache(a) => a.prep_transform_chain(t).await, Transforms::Tee(a) => a.prep_transform_chain(t).await, @@ -340,6 +352,7 @@ impl Transforms { Transforms::CassandraSinkSingle(c) => c.set_pushed_messages_tx(pushed_messages_tx), Transforms::CassandraSinkCluster(c) => c.set_pushed_messages_tx(pushed_messages_tx), Transforms::CassandraPeersRewrite(c) => c.set_pushed_messages_tx(pushed_messages_tx), + Transforms::CassandraOptionsRewrite(c) => c.set_pushed_messages_tx(pushed_messages_tx), Transforms::RedisCache(r) => r.set_pushed_messages_tx(pushed_messages_tx), Transforms::Tee(t) => t.set_pushed_messages_tx(pushed_messages_tx), Transforms::RedisSinkSingle(r) => r.set_pushed_messages_tx(pushed_messages_tx), @@ -373,6 +386,7 @@ pub enum TransformsConfig { CassandraSinkCluster(CassandraSinkClusterConfig), RedisSinkSingle(RedisSinkSingleConfig), CassandraPeersRewrite(CassandraPeersRewriteConfig), + CassandraOptionsRewrite(CassandraOptionsRewriteConfig), RedisCache(RedisConfig), Tee(TeeConfig), ConsistentScatter(ConsistentScatterConfig), @@ -406,6 +420,7 @@ impl TransformsConfig { TransformsConfig::CassandraSinkSingle(c) => c.get_transform(chain_name).await, TransformsConfig::CassandraSinkCluster(c) => c.get_transform(chain_name).await, TransformsConfig::CassandraPeersRewrite(c) => c.get_transform().await, + TransformsConfig::CassandraOptionsRewrite(c) => c.get_transform().await, TransformsConfig::RedisCache(r) => r.get_transform().await, TransformsConfig::Tee(t) => t.get_transform().await, TransformsConfig::RedisSinkSingle(r) => r.get_transform(chain_name).await, diff --git a/shotover-proxy/tests/cassandra_int_tests/mod.rs b/shotover-proxy/tests/cassandra_int_tests/mod.rs index 8da93ae7a..188da1bfc 100644 --- a/shotover-proxy/tests/cassandra_int_tests/mod.rs +++ b/shotover-proxy/tests/cassandra_int_tests/mod.rs @@ -1,8 +1,14 @@ use crate::helpers::ShotoverManager; use cassandra_protocol::frame::message_error::{ErrorBody, ErrorType}; +use cdrs_tokio::cluster::send_envelope::send_envelope; use cdrs_tokio::frame::events::{ SchemaChange, SchemaChangeOptions, SchemaChangeTarget, SchemaChangeType, ServerEvent, }; +use cdrs_tokio::frame::message_response::ResponseBody; +use cdrs_tokio::frame::message_supported::BodyResSupported; +use cdrs_tokio::frame::Envelope; +use cdrs_tokio::frame::Version; +use cdrs_tokio::retry::DefaultRetrySession; use futures::future::join_all; use futures::Future; use metrics_util::debugging::DebuggingRecorder; @@ -576,6 +582,59 @@ async fn peers_rewrite_v3(#[case] driver: CassandraDriver) { shotover.shutdown_and_then_consume_events(&[]).await; } +#[rstest] +#[case::cdrs(CdrsTokio)] +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn test_options_rewrite(#[case] driver: CassandraDriver) { + let _docker_compose = + DockerCompose::new("tests/test-configs/cassandra-options-rewrite/docker-compose.yaml"); + + let _shotover_manager = ShotoverManager::from_topology_file( + "tests/test-configs/cassandra-options-rewrite/topology.yaml", + ); + + let envelope = Envelope::new_req_options(Version::V4); + + let normal_connection = CassandraConnection::new("127.0.0.1", 9043, driver).await; + let normal_query_plan = normal_connection.as_cdrs().query_plan(None); + let normal_result = send_envelope( + normal_query_plan.into_iter(), + &envelope, + false, + Box::::default(), + ) + .await + .unwrap() + .unwrap() + .response_body() + .unwrap(); + if let ResponseBody::Supported(BodyResSupported { data }) = normal_result { + assert_eq!(*data.get("CQL_VERSION").unwrap(), vec!["3.4.5".to_string()]); + } + + let options_rewrite_connection = CassandraConnection::new("127.0.0.1", 9044, driver).await; + let options_rewrite_query_plan = options_rewrite_connection.as_cdrs().query_plan(None); + let options_rewrite_result = send_envelope( + options_rewrite_query_plan.into_iter(), + &envelope, + false, + Box::::default(), + ) + .await + .unwrap() + .unwrap() + .response_body() + .unwrap(); + + if let ResponseBody::Supported(BodyResSupported { data }) = options_rewrite_result { + assert_eq!( + *data.get("CQL_VERSION").unwrap(), + vec!["changed".to_string()] + ); + } +} + #[rstest] //#[case::cdrs(CdrsTokio)] // TODO: cdrs-tokio seems to be sending extra messages triggering the rate limiter #[case::scylla(Scylla)] diff --git a/shotover-proxy/tests/test-configs/cassandra-options-rewrite/docker-compose.yaml b/shotover-proxy/tests/test-configs/cassandra-options-rewrite/docker-compose.yaml new file mode 100644 index 000000000..47c6b152c --- /dev/null +++ b/shotover-proxy/tests/test-configs/cassandra-options-rewrite/docker-compose.yaml @@ -0,0 +1,14 @@ +version: "3.3" +services: + cassandra-one: + image: shotover-int-tests/cassandra:4.0.6 + ports: + - "9042:9042" + environment: + MAX_HEAP_SIZE: "400M" + MIN_HEAP_SIZE: "400M" + HEAP_NEWSIZE: "48M" + volumes: + - type: tmpfs + target: /var/lib/cassandra + command: cassandra -f -Dcassandra.skip_wait_for_gossip_to_settle=0 -Dcassandra.initial_token=0 diff --git a/shotover-proxy/tests/test-configs/cassandra-options-rewrite/topology.yaml b/shotover-proxy/tests/test-configs/cassandra-options-rewrite/topology.yaml new file mode 100644 index 000000000..ceef237db --- /dev/null +++ b/shotover-proxy/tests/test-configs/cassandra-options-rewrite/topology.yaml @@ -0,0 +1,24 @@ +--- +sources: + cassandra_prod_1: + Cassandra: + listen_addr: "127.0.0.1:9043" + cassandra_prod_2: + Cassandra: + listen_addr: "127.0.0.1:9044" + +chain_config: + main_chain: + - CassandraSinkSingle: + remote_address: "127.0.0.1:9042" + connect_timeout_ms: 3000 + options_rewrite: + - CassandraOptionsRewrite: + map: + CQL_VERSION: ["changed"] + - CassandraSinkSingle: + remote_address: "127.0.0.1:9042" + connect_timeout_ms: 3000 +source_to_chain_mapping: + cassandra_prod_1: main_chain + cassandra_prod_2: options_rewrite From a4ca5e50a7e1f19ded8327c52587f78e3636e886 Mon Sep 17 00:00:00 2001 From: Conor Brosnan Date: Mon, 30 Jan 2023 19:04:51 +1000 Subject: [PATCH 3/3] protocol level instead of transform --- shotover-proxy/src/codec/cassandra.rs | 11 ++++ .../src/transforms/cassandra/mod.rs | 1 - .../transforms/cassandra/options_rewrite.rs | 64 ------------------- shotover-proxy/src/transforms/mod.rs | 14 ---- .../tests/cassandra_int_tests/mod.rs | 59 ----------------- .../docker-compose.yaml | 14 ---- .../cassandra-options-rewrite/topology.yaml | 24 ------- test-helpers/src/connection/cassandra.rs | 1 + 8 files changed, 12 insertions(+), 176 deletions(-) delete mode 100644 shotover-proxy/src/transforms/cassandra/options_rewrite.rs delete mode 100644 shotover-proxy/tests/test-configs/cassandra-options-rewrite/docker-compose.yaml delete mode 100644 shotover-proxy/tests/test-configs/cassandra-options-rewrite/topology.yaml diff --git a/shotover-proxy/src/codec/cassandra.rs b/shotover-proxy/src/codec/cassandra.rs index e36e2f279..515988b90 100644 --- a/shotover-proxy/src/codec/cassandra.rs +++ b/shotover-proxy/src/codec/cassandra.rs @@ -6,6 +6,7 @@ use anyhow::{anyhow, Result}; use bytes::{Buf, BufMut, BytesMut}; use cassandra_protocol::compression::Compression; use cassandra_protocol::frame::message_error::{ErrorBody, ErrorType}; +use cassandra_protocol::frame::message_supported::BodyResSupported; use cassandra_protocol::frame::{ CheckEnvelopeSizeError, Envelope as RawCassandraFrame, Opcode, Version, }; @@ -80,6 +81,16 @@ impl Decoder for CassandraCodec { return Err(reject_compression(frame.stream_id, compression)); } } + + if let CassandraOperation::Supported(BodyResSupported { data }) = + &mut frame.operation + { + if let Some(value) = data.get_mut("COMPRESSION") { + *value = vec![]; + } + + message.invalidate_cache(); + } } if let Ok(Metadata::Cassandra(CassandraMetadata { diff --git a/shotover-proxy/src/transforms/cassandra/mod.rs b/shotover-proxy/src/transforms/cassandra/mod.rs index 037c6b452..5c935ce91 100644 --- a/shotover-proxy/src/transforms/cassandra/mod.rs +++ b/shotover-proxy/src/transforms/cassandra/mod.rs @@ -1,5 +1,4 @@ mod connection; -pub mod options_rewrite; pub mod peers_rewrite; pub mod sink_cluster; pub mod sink_single; diff --git a/shotover-proxy/src/transforms/cassandra/options_rewrite.rs b/shotover-proxy/src/transforms/cassandra/options_rewrite.rs deleted file mode 100644 index 4c417f22d..000000000 --- a/shotover-proxy/src/transforms/cassandra/options_rewrite.rs +++ /dev/null @@ -1,64 +0,0 @@ -use crate::frame::{CassandraOperation, Frame}; -use crate::{ - error::ChainResponse, - transforms::{Transform, TransformBuilder, Wrapper}, -}; -use anyhow::Result; -use async_trait::async_trait; -use cassandra_protocol::frame::message_supported::BodyResSupported; -use serde::Deserialize; -use std::collections::HashMap; - -#[derive(Deserialize, Debug, Clone)] -pub struct CassandraOptionsRewriteConfig { - pub map: HashMap>, -} - -impl CassandraOptionsRewriteConfig { - pub async fn get_transform(&self) -> Result { - Ok(TransformBuilder::CassandraOptionsRewrite( - CassandraOptionsRewrite::new(self.map.clone()), - )) - } -} - -#[derive(Clone)] -pub struct CassandraOptionsRewrite { - map: HashMap>, -} - -impl CassandraOptionsRewrite { - pub fn new(map: HashMap>) -> Self { - CassandraOptionsRewrite { map } - } -} - -#[async_trait] -impl Transform for CassandraOptionsRewrite { - async fn transform<'a>(&'a mut self, message_wrapper: Wrapper<'a>) -> ChainResponse { - let mut response = message_wrapper.call_next_transform().await?; - for message in &mut response { - let mut invalidate = false; - if let Some(Frame::Cassandra(frame)) = message.frame() { - if let CassandraOperation::Supported(BodyResSupported { data }) = - &mut frame.operation - { - for key in self.map.keys() { - data.insert(key.clone(), self.map.get(key).unwrap().clone()); - } - invalidate = true; - } - } - if invalidate { - message.invalidate_cache(); - } - } - - Ok(response) - } - - async fn transform_pushed<'a>(&'a mut self, message_wrapper: Wrapper<'a>) -> ChainResponse { - let response = message_wrapper.call_next_transform_pushed().await?; - Ok(response) - } -} diff --git a/shotover-proxy/src/transforms/mod.rs b/shotover-proxy/src/transforms/mod.rs index 1b8106d2e..39d4d4006 100644 --- a/shotover-proxy/src/transforms/mod.rs +++ b/shotover-proxy/src/transforms/mod.rs @@ -1,8 +1,5 @@ use crate::error::ChainResponse; use crate::message::Messages; -use crate::transforms::cassandra::options_rewrite::{ - CassandraOptionsRewrite, CassandraOptionsRewriteConfig, -}; use crate::transforms::cassandra::peers_rewrite::{ CassandraPeersRewrite, CassandraPeersRewriteConfig, }; @@ -84,7 +81,6 @@ pub enum TransformBuilder { CassandraSinkCluster(Box), RedisSinkSingle(RedisSinkSingle), CassandraPeersRewrite(CassandraPeersRewrite), - CassandraOptionsRewrite(CassandraOptionsRewrite), RedisCache(SimpleRedisCacheBuilder), Tee(Tee), Null(Null), @@ -115,7 +111,6 @@ impl TransformBuilder { Transforms::CassandraSinkCluster(t.build()) } TransformBuilder::CassandraPeersRewrite(t) => Transforms::CassandraPeersRewrite(t), - TransformBuilder::CassandraOptionsRewrite(t) => Transforms::CassandraOptionsRewrite(t), TransformBuilder::RedisCache(t) => Transforms::RedisCache(t.build()), TransformBuilder::Tee(t) => Transforms::Tee(t), TransformBuilder::RedisSinkSingle(t) => Transforms::RedisSinkSingle(t), @@ -151,7 +146,6 @@ impl TransformBuilder { TransformBuilder::CassandraSinkSingle(c) => c.validate(), TransformBuilder::CassandraSinkCluster(c) => c.validate(), TransformBuilder::CassandraPeersRewrite(c) => c.validate(), - TransformBuilder::CassandraOptionsRewrite(c) => c.validate(), TransformBuilder::RedisCache(r) => r.validate(), TransformBuilder::Tee(t) => t.validate(), TransformBuilder::RedisSinkSingle(r) => r.validate(), @@ -181,7 +175,6 @@ impl TransformBuilder { TransformBuilder::CassandraSinkSingle(c) => c.is_terminating(), TransformBuilder::CassandraSinkCluster(c) => c.is_terminating(), TransformBuilder::CassandraPeersRewrite(c) => c.is_terminating(), - TransformBuilder::CassandraOptionsRewrite(c) => c.is_terminating(), TransformBuilder::RedisCache(r) => r.is_terminating(), TransformBuilder::Tee(t) => t.is_terminating(), TransformBuilder::RedisSinkSingle(r) => r.is_terminating(), @@ -223,7 +216,6 @@ pub enum Transforms { CassandraSinkCluster(Box), RedisSinkSingle(RedisSinkSingle), CassandraPeersRewrite(CassandraPeersRewrite), - CassandraOptionsRewrite(CassandraOptionsRewrite), RedisCache(SimpleRedisCache), Tee(Tee), Null(Null), @@ -258,7 +250,6 @@ impl Transforms { Transforms::CassandraSinkSingle(c) => c.transform(message_wrapper).await, Transforms::CassandraSinkCluster(c) => c.transform(message_wrapper).await, Transforms::CassandraPeersRewrite(c) => c.transform(message_wrapper).await, - Transforms::CassandraOptionsRewrite(c) => c.transform(message_wrapper).await, Transforms::RedisCache(r) => r.transform(message_wrapper).await, Transforms::Tee(m) => m.transform(message_wrapper).await, Transforms::DebugPrinter(p) => p.transform(message_wrapper).await, @@ -287,7 +278,6 @@ impl Transforms { match self { Transforms::CassandraSinkSingle(c) => c.transform_pushed(message_wrapper).await, Transforms::CassandraSinkCluster(c) => c.transform_pushed(message_wrapper).await, - Transforms::CassandraOptionsRewrite(c) => c.transform_pushed(message_wrapper).await, Transforms::CassandraPeersRewrite(c) => c.transform_pushed(message_wrapper).await, Transforms::RedisCache(r) => r.transform_pushed(message_wrapper).await, Transforms::Tee(m) => m.transform_pushed(message_wrapper).await, @@ -322,7 +312,6 @@ impl Transforms { Transforms::CassandraSinkSingle(a) => a.prep_transform_chain(t).await, Transforms::CassandraSinkCluster(a) => a.prep_transform_chain(t).await, Transforms::CassandraPeersRewrite(c) => c.prep_transform_chain(t).await, - Transforms::CassandraOptionsRewrite(c) => c.prep_transform_chain(t).await, Transforms::RedisSinkSingle(a) => a.prep_transform_chain(t).await, Transforms::RedisCache(a) => a.prep_transform_chain(t).await, Transforms::Tee(a) => a.prep_transform_chain(t).await, @@ -352,7 +341,6 @@ impl Transforms { Transforms::CassandraSinkSingle(c) => c.set_pushed_messages_tx(pushed_messages_tx), Transforms::CassandraSinkCluster(c) => c.set_pushed_messages_tx(pushed_messages_tx), Transforms::CassandraPeersRewrite(c) => c.set_pushed_messages_tx(pushed_messages_tx), - Transforms::CassandraOptionsRewrite(c) => c.set_pushed_messages_tx(pushed_messages_tx), Transforms::RedisCache(r) => r.set_pushed_messages_tx(pushed_messages_tx), Transforms::Tee(t) => t.set_pushed_messages_tx(pushed_messages_tx), Transforms::RedisSinkSingle(r) => r.set_pushed_messages_tx(pushed_messages_tx), @@ -386,7 +374,6 @@ pub enum TransformsConfig { CassandraSinkCluster(CassandraSinkClusterConfig), RedisSinkSingle(RedisSinkSingleConfig), CassandraPeersRewrite(CassandraPeersRewriteConfig), - CassandraOptionsRewrite(CassandraOptionsRewriteConfig), RedisCache(RedisConfig), Tee(TeeConfig), ConsistentScatter(ConsistentScatterConfig), @@ -420,7 +407,6 @@ impl TransformsConfig { TransformsConfig::CassandraSinkSingle(c) => c.get_transform(chain_name).await, TransformsConfig::CassandraSinkCluster(c) => c.get_transform(chain_name).await, TransformsConfig::CassandraPeersRewrite(c) => c.get_transform().await, - TransformsConfig::CassandraOptionsRewrite(c) => c.get_transform().await, TransformsConfig::RedisCache(r) => r.get_transform().await, TransformsConfig::Tee(t) => t.get_transform().await, TransformsConfig::RedisSinkSingle(r) => r.get_transform(chain_name).await, diff --git a/shotover-proxy/tests/cassandra_int_tests/mod.rs b/shotover-proxy/tests/cassandra_int_tests/mod.rs index 188da1bfc..8da93ae7a 100644 --- a/shotover-proxy/tests/cassandra_int_tests/mod.rs +++ b/shotover-proxy/tests/cassandra_int_tests/mod.rs @@ -1,14 +1,8 @@ use crate::helpers::ShotoverManager; use cassandra_protocol::frame::message_error::{ErrorBody, ErrorType}; -use cdrs_tokio::cluster::send_envelope::send_envelope; use cdrs_tokio::frame::events::{ SchemaChange, SchemaChangeOptions, SchemaChangeTarget, SchemaChangeType, ServerEvent, }; -use cdrs_tokio::frame::message_response::ResponseBody; -use cdrs_tokio::frame::message_supported::BodyResSupported; -use cdrs_tokio::frame::Envelope; -use cdrs_tokio::frame::Version; -use cdrs_tokio::retry::DefaultRetrySession; use futures::future::join_all; use futures::Future; use metrics_util::debugging::DebuggingRecorder; @@ -582,59 +576,6 @@ async fn peers_rewrite_v3(#[case] driver: CassandraDriver) { shotover.shutdown_and_then_consume_events(&[]).await; } -#[rstest] -#[case::cdrs(CdrsTokio)] -#[tokio::test(flavor = "multi_thread")] -#[serial] -async fn test_options_rewrite(#[case] driver: CassandraDriver) { - let _docker_compose = - DockerCompose::new("tests/test-configs/cassandra-options-rewrite/docker-compose.yaml"); - - let _shotover_manager = ShotoverManager::from_topology_file( - "tests/test-configs/cassandra-options-rewrite/topology.yaml", - ); - - let envelope = Envelope::new_req_options(Version::V4); - - let normal_connection = CassandraConnection::new("127.0.0.1", 9043, driver).await; - let normal_query_plan = normal_connection.as_cdrs().query_plan(None); - let normal_result = send_envelope( - normal_query_plan.into_iter(), - &envelope, - false, - Box::::default(), - ) - .await - .unwrap() - .unwrap() - .response_body() - .unwrap(); - if let ResponseBody::Supported(BodyResSupported { data }) = normal_result { - assert_eq!(*data.get("CQL_VERSION").unwrap(), vec!["3.4.5".to_string()]); - } - - let options_rewrite_connection = CassandraConnection::new("127.0.0.1", 9044, driver).await; - let options_rewrite_query_plan = options_rewrite_connection.as_cdrs().query_plan(None); - let options_rewrite_result = send_envelope( - options_rewrite_query_plan.into_iter(), - &envelope, - false, - Box::::default(), - ) - .await - .unwrap() - .unwrap() - .response_body() - .unwrap(); - - if let ResponseBody::Supported(BodyResSupported { data }) = options_rewrite_result { - assert_eq!( - *data.get("CQL_VERSION").unwrap(), - vec!["changed".to_string()] - ); - } -} - #[rstest] //#[case::cdrs(CdrsTokio)] // TODO: cdrs-tokio seems to be sending extra messages triggering the rate limiter #[case::scylla(Scylla)] diff --git a/shotover-proxy/tests/test-configs/cassandra-options-rewrite/docker-compose.yaml b/shotover-proxy/tests/test-configs/cassandra-options-rewrite/docker-compose.yaml deleted file mode 100644 index 47c6b152c..000000000 --- a/shotover-proxy/tests/test-configs/cassandra-options-rewrite/docker-compose.yaml +++ /dev/null @@ -1,14 +0,0 @@ -version: "3.3" -services: - cassandra-one: - image: shotover-int-tests/cassandra:4.0.6 - ports: - - "9042:9042" - environment: - MAX_HEAP_SIZE: "400M" - MIN_HEAP_SIZE: "400M" - HEAP_NEWSIZE: "48M" - volumes: - - type: tmpfs - target: /var/lib/cassandra - command: cassandra -f -Dcassandra.skip_wait_for_gossip_to_settle=0 -Dcassandra.initial_token=0 diff --git a/shotover-proxy/tests/test-configs/cassandra-options-rewrite/topology.yaml b/shotover-proxy/tests/test-configs/cassandra-options-rewrite/topology.yaml deleted file mode 100644 index ceef237db..000000000 --- a/shotover-proxy/tests/test-configs/cassandra-options-rewrite/topology.yaml +++ /dev/null @@ -1,24 +0,0 @@ ---- -sources: - cassandra_prod_1: - Cassandra: - listen_addr: "127.0.0.1:9043" - cassandra_prod_2: - Cassandra: - listen_addr: "127.0.0.1:9044" - -chain_config: - main_chain: - - CassandraSinkSingle: - remote_address: "127.0.0.1:9042" - connect_timeout_ms: 3000 - options_rewrite: - - CassandraOptionsRewrite: - map: - CQL_VERSION: ["changed"] - - CassandraSinkSingle: - remote_address: "127.0.0.1:9042" - connect_timeout_ms: 3000 -source_to_chain_mapping: - cassandra_prod_1: main_chain - cassandra_prod_2: options_rewrite diff --git a/test-helpers/src/connection/cassandra.rs b/test-helpers/src/connection/cassandra.rs index 5689db22f..394c3f5e0 100644 --- a/test-helpers/src/connection/cassandra.rs +++ b/test-helpers/src/connection/cassandra.rs @@ -179,6 +179,7 @@ impl CassandraConnection { ) .user("cassandra", "cassandra") .default_consistency(Consistency::One) + .compression(Some(scylla::transport::Compression::Snappy)) .build() .await .unwrap();