Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SUPPORTED rewrite #991

Merged
merged 7 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions shotover-proxy/src/codec/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,10 @@ mod cassandra_protocol_tests {
ColSpec, ColType, ColTypeOption, ColTypeOptionValue, RowsMetadata, RowsMetadataFlags,
TableSpec,
};
use cassandra_protocol::frame::message_startup::BodyReqStartup;
use cassandra_protocol::frame::Version;
use hex_literal::hex;
use std::collections::HashMap;
use tokio_util::codec::{Decoder, Encoder};

fn test_frame_codec_roundtrip(
Expand Down Expand Up @@ -277,12 +279,12 @@ mod cassandra_protocol_tests {
#[test]
fn test_codec_startup() {
let mut codec = CassandraCodec::new();
let mut startup_body: HashMap<String, String> = HashMap::new();
startup_body.insert("CQL_VERSION".into(), "3.0.0".into());
let bytes = hex!("0400000001000000160001000b43514c5f56455253494f4e0005332e302e30");
let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame {
version: Version::V4,
operation: CassandraOperation::Startup(vec![
0, 1, 0, 11, 67, 81, 76, 95, 86, 69, 82, 83, 73, 79, 78, 0, 5, 51, 46, 48, 46, 48,
]),
operation: CassandraOperation::Startup(BodyReqStartup { map: startup_body }),
stream_id: 0,
tracing: Tracing::Request(false),
warnings: vec![],
Expand Down
22 changes: 18 additions & 4 deletions shotover-proxy/src/frame/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use cassandra_protocol::frame::message_result::{
BodyResResultPrepared, BodyResResultSetKeyspace, ColSpec, ColTypeOption, ResResultBody,
ResultKind, RowsMetadata, RowsMetadataFlags,
};
use cassandra_protocol::frame::message_startup::BodyReqStartup;
use cassandra_protocol::frame::message_supported::BodyResSupported;
use cassandra_protocol::frame::{
Direction, Envelope as RawCassandraFrame, Flags, Opcode, Serialize, StreamId, Version,
};
Expand Down Expand Up @@ -276,11 +278,23 @@ impl CassandraFrame {
unreachable!("We already know the operation is an error")
}
}
Opcode::Startup => CassandraOperation::Startup(frame.body),
Opcode::Startup => {
if let RequestBody::Startup(body) = frame.request_body()? {
CassandraOperation::Startup(body)
} else {
unreachable!("We already know the operation is a startup")
}
}
Opcode::Ready => CassandraOperation::Ready(frame.body),
Opcode::Authenticate => CassandraOperation::Authenticate(frame.body),
Opcode::Options => CassandraOperation::Options(frame.body),
Opcode::Supported => CassandraOperation::Supported(frame.body),
Opcode::Supported => {
if let ResponseBody::Supported(body) = frame.response_body()? {
CassandraOperation::Supported(body)
} else {
unreachable!("we already know this is a supported");
}
}
Opcode::Prepare => CassandraOperation::Prepare(frame.body),
Opcode::Execute => {
if let RequestBody::Execute(body) = frame.request_body()? {
Expand Down Expand Up @@ -417,11 +431,11 @@ pub enum CassandraOperation {
Event(ServerEvent),
Batch(CassandraBatch),
// operations for protocol negotiation, should be ignored by transforms
Startup(Vec<u8>),
Startup(BodyReqStartup),
Ready(Vec<u8>),
Authenticate(Vec<u8>),
Options(Vec<u8>),
Supported(Vec<u8>),
Supported(BodyResSupported),
AuthChallenge(Vec<u8>),
AuthResponse(Vec<u8>),
AuthSuccess(Vec<u8>),
Expand Down
1 change: 1 addition & 0 deletions shotover-proxy/src/transforms/cassandra/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod connection;
pub mod options_rewrite;
pub mod peers_rewrite;
pub mod sink_cluster;
pub mod sink_single;
64 changes: 64 additions & 0 deletions shotover-proxy/src/transforms/cassandra/options_rewrite.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use crate::frame::{CassandraOperation, Frame};
use crate::{
error::ChainResponse,
transforms::{Transform, TransformBuilder, Wrapper},
};
use anyhow::Result;
use async_trait::async_trait;
use cassandra_protocol::frame::message_supported::BodyResSupported;
use serde::Deserialize;
use std::collections::HashMap;

#[derive(Deserialize, Debug, Clone)]
pub struct CassandraOptionsRewriteConfig {
pub map: HashMap<String, Vec<String>>,
}

impl CassandraOptionsRewriteConfig {
pub async fn get_transform(&self) -> Result<TransformBuilder> {
Ok(TransformBuilder::CassandraOptionsRewrite(
CassandraOptionsRewrite::new(self.map.clone()),
))
}
}

#[derive(Clone)]
pub struct CassandraOptionsRewrite {
map: HashMap<String, Vec<String>>,
}

impl CassandraOptionsRewrite {
pub fn new(map: HashMap<String, Vec<String>>) -> Self {
CassandraOptionsRewrite { map }
}
}

#[async_trait]
impl Transform for CassandraOptionsRewrite {
async fn transform<'a>(&'a mut self, message_wrapper: Wrapper<'a>) -> ChainResponse {
let mut response = message_wrapper.call_next_transform().await?;
for message in &mut response {
let mut invalidate = false;
if let Some(Frame::Cassandra(frame)) = message.frame() {
if let CassandraOperation::Supported(BodyResSupported { data }) =
&mut frame.operation
{
for key in self.map.keys() {
data.insert(key.clone(), self.map.get(key).unwrap().clone());
}
invalidate = true;
}
}
if invalidate {
message.invalidate_cache();
}
}

Ok(response)
}

async fn transform_pushed<'a>(&'a mut self, message_wrapper: Wrapper<'a>) -> ChainResponse {
let response = message_wrapper.call_next_transform_pushed().await?;
Ok(response)
}
}
25 changes: 20 additions & 5 deletions shotover-proxy/src/transforms/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use self::cassandra::sink_cluster::CassandraSinkClusterBuilder;
use crate::error::ChainResponse;
use crate::message::Messages;
use crate::transforms::cassandra::peers_rewrite::CassandraPeersRewrite;
use crate::transforms::cassandra::peers_rewrite::CassandraPeersRewriteConfig;
use crate::transforms::cassandra::sink_cluster::CassandraSinkCluster;
use crate::transforms::cassandra::sink_cluster::CassandraSinkClusterConfig;
use crate::transforms::cassandra::options_rewrite::{
CassandraOptionsRewrite, CassandraOptionsRewriteConfig,
};
use crate::transforms::cassandra::peers_rewrite::{
CassandraPeersRewrite, CassandraPeersRewriteConfig,
};
use crate::transforms::cassandra::sink_cluster::{
CassandraSinkCluster, CassandraSinkClusterBuilder, CassandraSinkClusterConfig,
};
use crate::transforms::cassandra::sink_single::{CassandraSinkSingle, CassandraSinkSingleConfig};
use crate::transforms::chain::{TransformChain, TransformChainBuilder};
use crate::transforms::coalesce::{Coalesce, CoalesceConfig};
Expand Down Expand Up @@ -80,6 +84,7 @@ pub enum TransformBuilder {
CassandraSinkCluster(Box<CassandraSinkClusterBuilder>),
RedisSinkSingle(RedisSinkSingle),
CassandraPeersRewrite(CassandraPeersRewrite),
CassandraOptionsRewrite(CassandraOptionsRewrite),
RedisCache(SimpleRedisCacheBuilder),
Tee(Tee),
Null(Null),
Expand Down Expand Up @@ -110,6 +115,7 @@ impl TransformBuilder {
Transforms::CassandraSinkCluster(t.build())
}
TransformBuilder::CassandraPeersRewrite(t) => Transforms::CassandraPeersRewrite(t),
TransformBuilder::CassandraOptionsRewrite(t) => Transforms::CassandraOptionsRewrite(t),
TransformBuilder::RedisCache(t) => Transforms::RedisCache(t.build()),
TransformBuilder::Tee(t) => Transforms::Tee(t),
TransformBuilder::RedisSinkSingle(t) => Transforms::RedisSinkSingle(t),
Expand Down Expand Up @@ -145,6 +151,7 @@ impl TransformBuilder {
TransformBuilder::CassandraSinkSingle(c) => c.validate(),
TransformBuilder::CassandraSinkCluster(c) => c.validate(),
TransformBuilder::CassandraPeersRewrite(c) => c.validate(),
TransformBuilder::CassandraOptionsRewrite(c) => c.validate(),
TransformBuilder::RedisCache(r) => r.validate(),
TransformBuilder::Tee(t) => t.validate(),
TransformBuilder::RedisSinkSingle(r) => r.validate(),
Expand Down Expand Up @@ -174,6 +181,7 @@ impl TransformBuilder {
TransformBuilder::CassandraSinkSingle(c) => c.is_terminating(),
TransformBuilder::CassandraSinkCluster(c) => c.is_terminating(),
TransformBuilder::CassandraPeersRewrite(c) => c.is_terminating(),
TransformBuilder::CassandraOptionsRewrite(c) => c.is_terminating(),
TransformBuilder::RedisCache(r) => r.is_terminating(),
TransformBuilder::Tee(t) => t.is_terminating(),
TransformBuilder::RedisSinkSingle(r) => r.is_terminating(),
Expand Down Expand Up @@ -215,6 +223,7 @@ pub enum Transforms {
CassandraSinkCluster(Box<CassandraSinkCluster>),
RedisSinkSingle(RedisSinkSingle),
CassandraPeersRewrite(CassandraPeersRewrite),
CassandraOptionsRewrite(CassandraOptionsRewrite),
RedisCache(SimpleRedisCache),
Tee(Tee),
Null(Null),
Expand Down Expand Up @@ -249,6 +258,7 @@ impl Transforms {
Transforms::CassandraSinkSingle(c) => c.transform(message_wrapper).await,
Transforms::CassandraSinkCluster(c) => c.transform(message_wrapper).await,
Transforms::CassandraPeersRewrite(c) => c.transform(message_wrapper).await,
Transforms::CassandraOptionsRewrite(c) => c.transform(message_wrapper).await,
Transforms::RedisCache(r) => r.transform(message_wrapper).await,
Transforms::Tee(m) => m.transform(message_wrapper).await,
Transforms::DebugPrinter(p) => p.transform(message_wrapper).await,
Expand Down Expand Up @@ -277,6 +287,7 @@ impl Transforms {
match self {
Transforms::CassandraSinkSingle(c) => c.transform_pushed(message_wrapper).await,
Transforms::CassandraSinkCluster(c) => c.transform_pushed(message_wrapper).await,
Transforms::CassandraOptionsRewrite(c) => c.transform_pushed(message_wrapper).await,
Transforms::CassandraPeersRewrite(c) => c.transform_pushed(message_wrapper).await,
Transforms::RedisCache(r) => r.transform_pushed(message_wrapper).await,
Transforms::Tee(m) => m.transform_pushed(message_wrapper).await,
Expand Down Expand Up @@ -311,6 +322,7 @@ impl Transforms {
Transforms::CassandraSinkSingle(a) => a.prep_transform_chain(t).await,
Transforms::CassandraSinkCluster(a) => a.prep_transform_chain(t).await,
Transforms::CassandraPeersRewrite(c) => c.prep_transform_chain(t).await,
Transforms::CassandraOptionsRewrite(c) => c.prep_transform_chain(t).await,
Transforms::RedisSinkSingle(a) => a.prep_transform_chain(t).await,
Transforms::RedisCache(a) => a.prep_transform_chain(t).await,
Transforms::Tee(a) => a.prep_transform_chain(t).await,
Expand Down Expand Up @@ -340,6 +352,7 @@ impl Transforms {
Transforms::CassandraSinkSingle(c) => c.set_pushed_messages_tx(pushed_messages_tx),
Transforms::CassandraSinkCluster(c) => c.set_pushed_messages_tx(pushed_messages_tx),
Transforms::CassandraPeersRewrite(c) => c.set_pushed_messages_tx(pushed_messages_tx),
Transforms::CassandraOptionsRewrite(c) => c.set_pushed_messages_tx(pushed_messages_tx),
Transforms::RedisCache(r) => r.set_pushed_messages_tx(pushed_messages_tx),
Transforms::Tee(t) => t.set_pushed_messages_tx(pushed_messages_tx),
Transforms::RedisSinkSingle(r) => r.set_pushed_messages_tx(pushed_messages_tx),
Expand Down Expand Up @@ -373,6 +386,7 @@ pub enum TransformsConfig {
CassandraSinkCluster(CassandraSinkClusterConfig),
RedisSinkSingle(RedisSinkSingleConfig),
CassandraPeersRewrite(CassandraPeersRewriteConfig),
CassandraOptionsRewrite(CassandraOptionsRewriteConfig),
RedisCache(RedisConfig),
Tee(TeeConfig),
ConsistentScatter(ConsistentScatterConfig),
Expand Down Expand Up @@ -406,6 +420,7 @@ impl TransformsConfig {
TransformsConfig::CassandraSinkSingle(c) => c.get_transform(chain_name).await,
TransformsConfig::CassandraSinkCluster(c) => c.get_transform(chain_name).await,
TransformsConfig::CassandraPeersRewrite(c) => c.get_transform().await,
TransformsConfig::CassandraOptionsRewrite(c) => c.get_transform().await,
TransformsConfig::RedisCache(r) => r.get_transform().await,
TransformsConfig::Tee(t) => t.get_transform().await,
TransformsConfig::RedisSinkSingle(r) => r.get_transform(chain_name).await,
Expand Down
6 changes: 5 additions & 1 deletion shotover-proxy/tests/cassandra_int_tests/cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use cassandra_protocol::frame::message_startup::BodyReqStartup;
use cassandra_protocol::frame::Version;
use shotover_proxy::frame::{cassandra::Tracing, CassandraFrame, CassandraOperation, Frame};
use shotover_proxy::message::Message;
Expand Down Expand Up @@ -57,13 +58,16 @@ pub async fn run_topology_task(ca_path: Option<&str>, port: Option<u32>) -> Vec<
}

fn create_handshake() -> Vec<Message> {
let mut startup_body: HashMap<String, String> = HashMap::new();
startup_body.insert("CQL_VERSION".into(), "3.0.0".into());

vec![
Message::from_frame(Frame::Cassandra(CassandraFrame {
version: Version::V4,
stream_id: 64,
tracing: Tracing::Request(false),
warnings: vec![],
operation: CassandraOperation::Startup(b"\0\x01\0\x0bCQL_VERSION\0\x053.0.0".to_vec()),
operation: CassandraOperation::Startup(BodyReqStartup { map: startup_body }),
})),
Message::from_frame(Frame::Cassandra(CassandraFrame {
version: Version::V4,
Expand Down
59 changes: 59 additions & 0 deletions shotover-proxy/tests/cassandra_int_tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
use crate::helpers::ShotoverManager;
use cassandra_protocol::frame::message_error::{ErrorBody, ErrorType};
use cdrs_tokio::cluster::send_envelope::send_envelope;
use cdrs_tokio::frame::events::{
SchemaChange, SchemaChangeOptions, SchemaChangeTarget, SchemaChangeType, ServerEvent,
};
use cdrs_tokio::frame::message_response::ResponseBody;
use cdrs_tokio::frame::message_supported::BodyResSupported;
use cdrs_tokio::frame::Envelope;
use cdrs_tokio::frame::Version;
use cdrs_tokio::retry::DefaultRetrySession;
use futures::future::join_all;
use futures::Future;
use metrics_util::debugging::DebuggingRecorder;
Expand Down Expand Up @@ -576,6 +582,59 @@ async fn peers_rewrite_v3(#[case] driver: CassandraDriver) {
shotover.shutdown_and_then_consume_events(&[]).await;
}

#[rstest]
#[case::cdrs(CdrsTokio)]
#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn test_options_rewrite(#[case] driver: CassandraDriver) {
let _docker_compose =
DockerCompose::new("tests/test-configs/cassandra-options-rewrite/docker-compose.yaml");

let _shotover_manager = ShotoverManager::from_topology_file(
"tests/test-configs/cassandra-options-rewrite/topology.yaml",
);

let envelope = Envelope::new_req_options(Version::V4);

let normal_connection = CassandraConnection::new("127.0.0.1", 9043, driver).await;
let normal_query_plan = normal_connection.as_cdrs().query_plan(None);
let normal_result = send_envelope(
normal_query_plan.into_iter(),
&envelope,
false,
Box::<DefaultRetrySession>::default(),
)
.await
.unwrap()
.unwrap()
.response_body()
.unwrap();
if let ResponseBody::Supported(BodyResSupported { data }) = normal_result {
assert_eq!(*data.get("CQL_VERSION").unwrap(), vec!["3.4.5".to_string()]);
}

let options_rewrite_connection = CassandraConnection::new("127.0.0.1", 9044, driver).await;
let options_rewrite_query_plan = options_rewrite_connection.as_cdrs().query_plan(None);
let options_rewrite_result = send_envelope(
options_rewrite_query_plan.into_iter(),
&envelope,
false,
Box::<DefaultRetrySession>::default(),
)
.await
.unwrap()
.unwrap()
.response_body()
.unwrap();

if let ResponseBody::Supported(BodyResSupported { data }) = options_rewrite_result {
assert_eq!(
*data.get("CQL_VERSION").unwrap(),
vec!["changed".to_string()]
);
}
}

#[rstest]
//#[case::cdrs(CdrsTokio)] // TODO: cdrs-tokio seems to be sending extra messages triggering the rate limiter
#[case::scylla(Scylla)]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
version: "3.3"
services:
cassandra-one:
image: shotover-int-tests/cassandra:4.0.6
ports:
- "9042:9042"
environment:
MAX_HEAP_SIZE: "400M"
MIN_HEAP_SIZE: "400M"
HEAP_NEWSIZE: "48M"
volumes:
- type: tmpfs
target: /var/lib/cassandra
command: cassandra -f -Dcassandra.skip_wait_for_gossip_to_settle=0 -Dcassandra.initial_token=0
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
sources:
cassandra_prod_1:
Cassandra:
listen_addr: "127.0.0.1:9043"
cassandra_prod_2:
Cassandra:
listen_addr: "127.0.0.1:9044"

chain_config:
main_chain:
- CassandraSinkSingle:
remote_address: "127.0.0.1:9042"
connect_timeout_ms: 3000
options_rewrite:
- CassandraOptionsRewrite:
map:
CQL_VERSION: ["changed"]
- CassandraSinkSingle:
remote_address: "127.0.0.1:9042"
connect_timeout_ms: 3000
source_to_chain_mapping:
cassandra_prod_1: main_chain
cassandra_prod_2: options_rewrite