Skip to content

Commit

Permalink
Add CassandraSinkCluster (#717)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Aug 2, 2022
1 parent 7394ad1 commit f10252d
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
version: "3.3"
networks:
cluster_subnet:
name: cluster_subnet
driver: bridge
ipam:
driver: default
config:
- subnet: 172.16.1.0/24
gateway: 172.16.1.1

services:
cassandra-one:
image: bitnami/cassandra:3.11
networks:
cluster_subnet:
ipv4_address: 172.16.1.2
healthcheck:
&healthcheck
test: [ "CMD", "cqlsh", "-e", "describe keyspaces" ]
interval: 5s
timeout: 5s
retries: 60
environment:
&environment
CASSANDRA_SEEDS: "cassandra-one,cassandra-two"
CASSANDRA_CLUSTER_NAME: TestCluster
CASSANDRA_DC: dc1
CASSANDRA_RACK: rack1
CASSANDRA_ENDPOINT_SNITCH: GossipingPropertyFileSnitch
CASSANDRA_NUM_TOKENS: 128
MAX_HEAP_SIZE: "400M"
MIN_HEAP_SIZE: "400M"
HEAP_NEWSIZE: "48M"
CASSANDRA_ENABLE_SCRIPTED_USER_DEFINED_FUNCTIONS: "true"
CASSANDRA_ENABLE_USER_DEFINED_FUNCTIONS: "true"

cassandra-two:
image: bitnami/cassandra:3.11
networks:
cluster_subnet:
ipv4_address: 172.16.1.3
healthcheck: *healthcheck
environment: *environment

cassandra-three:
image: bitnami/cassandra:3.11
networks:
cluster_subnet:
ipv4_address: 172.16.1.4
healthcheck: *healthcheck
environment: *environment
12 changes: 12 additions & 0 deletions shotover-proxy/example-configs/cassandra-cluster/topology.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
sources:
cassandra_prod:
Cassandra:
listen_addr: "127.0.0.1:9042"
chain_config:
main_chain:
- CassandraSinkCluster:
first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"]
data_center: "dc1"
source_to_chain_mapping:
cassandra_prod: main_chain
1 change: 1 addition & 0 deletions shotover-proxy/src/transforms/cassandra/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod connection;
pub mod peers_rewrite;
pub mod sink_cluster;
pub mod sink_single;
165 changes: 165 additions & 0 deletions shotover-proxy/src/transforms/cassandra/sink_cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
use super::connection::CassandraConnection;
use crate::codec::cassandra::CassandraCodec;
use crate::concurrency::FuturesOrdered;
use crate::error::ChainResponse;
use crate::frame::cassandra;
use crate::message::Messages;
use crate::tls::TlsConfig;
use crate::tls::TlsConnector;
use crate::transforms::util::Response;
use crate::transforms::{Transform, Transforms, Wrapper};
use anyhow::Result;
use async_trait::async_trait;
use cassandra_protocol::frame::Opcode;
use metrics::{register_counter, Counter};
use serde::Deserialize;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use tokio::time::timeout;
use tokio_stream::StreamExt;
use tracing::{error, trace};

#[derive(Deserialize, Debug, Clone)]
pub struct CassandraSinkClusterConfig {
pub first_contact_points: Vec<String>,
pub data_center: String,
pub tls: Option<TlsConfig>,
}

impl CassandraSinkClusterConfig {
pub async fn get_transform(&self, chain_name: String) -> Result<Transforms> {
let tls = self.tls.clone().map(TlsConnector::new).transpose()?;
Ok(Transforms::CassandraSinkCluster(CassandraSinkCluster::new(
self.first_contact_points.clone(),
chain_name,
tls,
)))
}
}

pub struct CassandraSinkCluster {
contact_points: Vec<String>,
outbound: Option<CassandraConnection>,
chain_name: String,
failed_requests: Counter,
tls: Option<TlsConnector>,
pushed_messages_tx: Option<mpsc::UnboundedSender<Messages>>,
}

impl Clone for CassandraSinkCluster {
fn clone(&self) -> Self {
CassandraSinkCluster {
contact_points: self.contact_points.clone(),
outbound: None,
chain_name: self.chain_name.clone(),
tls: self.tls.clone(),
failed_requests: self.failed_requests.clone(),
pushed_messages_tx: None,
}
}
}

impl CassandraSinkCluster {
pub fn new(
contact_points: Vec<String>,
chain_name: String,
tls: Option<TlsConnector>,
) -> CassandraSinkCluster {
let failed_requests = register_counter!("failed_requests", "chain" => chain_name.clone(), "transform" => "CassandraSinkCluster");

CassandraSinkCluster {
contact_points,
outbound: None,
chain_name,
failed_requests,
tls,
pushed_messages_tx: None,
}
}
}

impl CassandraSinkCluster {
async fn send_message(&mut self, messages: Messages) -> ChainResponse {
if self.outbound.is_none() {
trace!("creating outbound connection {:?}", self.contact_points);
self.outbound = Some(
CassandraConnection::new(
self.contact_points[0].clone(),
CassandraCodec::new(),
self.tls.clone(),
self.pushed_messages_tx.clone(),
)
.await?,
);
}

let outbound = self.outbound.as_mut().unwrap();
let expected_size = messages.len();
let results: Result<FuturesOrdered<oneshot::Receiver<Response>>> = messages
.into_iter()
.map(|m| {
let (return_chan_tx, return_chan_rx) = oneshot::channel();
outbound.send(m, return_chan_tx)?;

Ok(return_chan_rx)
})
.collect();

let mut responses = Vec::with_capacity(expected_size);
let mut results = results?;

loop {
match timeout(Duration::from_secs(5), results.next()).await {
Ok(Some(prelim)) => {
match prelim? {
Response {
response: Ok(message),
..
} => {
if let Some(raw_bytes) = message.as_raw_bytes() {
if let Ok(Opcode::Error) =
cassandra::raw_frame::get_opcode(raw_bytes)
{
self.failed_requests.increment(1);
}
}
responses.push(message);
}
Response {
mut original,
response: Err(err),
} => {
original.set_error(err.to_string());
responses.push(original);
}
};
}
Ok(None) => break,
Err(_) => {
error!(
"timed out waiting for responses, received {:?} responses but expected {:?} responses",
responses.len(),
expected_size
);
}
}
}

Ok(responses)
}
}

#[async_trait]
impl Transform for CassandraSinkCluster {
async fn transform<'a>(&'a mut self, message_wrapper: Wrapper<'a>) -> ChainResponse {
self.send_message(message_wrapper.messages).await
}

fn is_terminating(&self) -> bool {
true
}

fn add_pushed_messages_tx(&mut self, pushed_messages_tx: mpsc::UnboundedSender<Messages>) {
self.pushed_messages_tx = Some(pushed_messages_tx);
}
}
14 changes: 14 additions & 0 deletions shotover-proxy/src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ use crate::error::ChainResponse;
use crate::message::Messages;
use crate::transforms::cassandra::peers_rewrite::CassandraPeersRewrite;
use crate::transforms::cassandra::peers_rewrite::CassandraPeersRewriteConfig;
use crate::transforms::cassandra::sink_cluster::CassandraSinkCluster;
#[cfg(feature = "alpha-transforms")]
use crate::transforms::cassandra::sink_cluster::CassandraSinkClusterConfig;
use crate::transforms::cassandra::sink_single::{CassandraSinkSingle, CassandraSinkSingleConfig};
use crate::transforms::chain::TransformChain;
use crate::transforms::coalesce::{Coalesce, CoalesceConfig};
Expand Down Expand Up @@ -74,6 +77,7 @@ pub mod util;
#[derive(Clone, IntoStaticStr)]
pub enum Transforms {
CassandraSinkSingle(CassandraSinkSingle),
CassandraSinkCluster(CassandraSinkCluster),
RedisSinkSingle(RedisSinkSingle),
CassandraPeersRewrite(CassandraPeersRewrite),
RedisCache(SimpleRedisCache),
Expand Down Expand Up @@ -108,6 +112,7 @@ impl Transforms {
async fn transform<'a>(&'a mut self, message_wrapper: Wrapper<'a>) -> ChainResponse {
match self {
Transforms::CassandraSinkSingle(c) => c.transform(message_wrapper).await,
Transforms::CassandraSinkCluster(c) => c.transform(message_wrapper).await,
Transforms::CassandraPeersRewrite(c) => c.transform(message_wrapper).await,
Transforms::RedisCache(r) => r.transform(message_wrapper).await,
Transforms::Tee(m) => m.transform(message_wrapper).await,
Expand Down Expand Up @@ -136,6 +141,7 @@ impl Transforms {
async fn transform_pushed<'a>(&'a mut self, message_wrapper: Wrapper<'a>) -> ChainResponse {
match self {
Transforms::CassandraSinkSingle(c) => c.transform_pushed(message_wrapper).await,
Transforms::CassandraSinkCluster(c) => c.transform_pushed(message_wrapper).await,
Transforms::CassandraPeersRewrite(c) => c.transform_pushed(message_wrapper).await,
Transforms::RedisCache(r) => r.transform_pushed(message_wrapper).await,
Transforms::Tee(m) => m.transform_pushed(message_wrapper).await,
Expand Down Expand Up @@ -168,6 +174,7 @@ impl Transforms {
async fn _prep_transform_chain(&mut self, t: &mut TransformChain) -> Result<()> {
match self {
Transforms::CassandraSinkSingle(a) => a.prep_transform_chain(t).await,
Transforms::CassandraSinkCluster(a) => a.prep_transform_chain(t).await,
Transforms::CassandraPeersRewrite(c) => c.prep_transform_chain(t).await,
Transforms::RedisSinkSingle(a) => a.prep_transform_chain(t).await,
Transforms::RedisCache(a) => a.prep_transform_chain(t).await,
Expand Down Expand Up @@ -196,6 +203,7 @@ impl Transforms {
fn validate(&self) -> Vec<String> {
match self {
Transforms::CassandraSinkSingle(c) => c.validate(),
Transforms::CassandraSinkCluster(c) => c.validate(),
Transforms::CassandraPeersRewrite(c) => c.validate(),
Transforms::RedisCache(r) => r.validate(),
Transforms::Tee(t) => t.validate(),
Expand Down Expand Up @@ -224,6 +232,7 @@ impl Transforms {
fn is_terminating(&self) -> bool {
match self {
Transforms::CassandraSinkSingle(c) => c.is_terminating(),
Transforms::CassandraSinkCluster(c) => c.is_terminating(),
Transforms::CassandraPeersRewrite(c) => c.is_terminating(),
Transforms::RedisCache(r) => r.is_terminating(),
Transforms::Tee(t) => t.is_terminating(),
Expand Down Expand Up @@ -252,6 +261,7 @@ impl Transforms {
fn add_pushed_messages_tx(&mut self, pushed_messages_tx: mpsc::UnboundedSender<Messages>) {
match self {
Transforms::CassandraSinkSingle(c) => c.add_pushed_messages_tx(pushed_messages_tx),
Transforms::CassandraSinkCluster(c) => c.add_pushed_messages_tx(pushed_messages_tx),
Transforms::CassandraPeersRewrite(c) => c.add_pushed_messages_tx(pushed_messages_tx),
Transforms::RedisCache(r) => r.add_pushed_messages_tx(pushed_messages_tx),
Transforms::Tee(t) => t.add_pushed_messages_tx(pushed_messages_tx),
Expand Down Expand Up @@ -283,6 +293,8 @@ impl Transforms {
#[derive(Deserialize, Debug, Clone)]
pub enum TransformsConfig {
CassandraSinkSingle(CassandraSinkSingleConfig),
#[cfg(feature = "alpha-transforms")]
CassandraSinkCluster(CassandraSinkClusterConfig),
RedisSinkSingle(RedisSinkSingleConfig),
CassandraPeersRewrite(CassandraPeersRewriteConfig),
RedisCache(RedisConfig),
Expand Down Expand Up @@ -314,6 +326,8 @@ impl TransformsConfig {
pub async fn get_transform(&self, chain_name: String) -> Result<Transforms> {
match self {
TransformsConfig::CassandraSinkSingle(c) => c.get_transform(chain_name).await,
#[cfg(feature = "alpha-transforms")]
TransformsConfig::CassandraSinkCluster(c) => c.get_transform(chain_name).await,
TransformsConfig::CassandraPeersRewrite(c) => c.get_transform().await,
TransformsConfig::RedisCache(r) => r.get_transform().await,
TransformsConfig::Tee(t) => t.get_transform().await,
Expand Down
21 changes: 21 additions & 0 deletions shotover-proxy/tests/cassandra_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,27 @@ fn test_passthrough() {
batch_statements::test(&connection);
}

#[test]
#[serial]
#[cfg(feature = "alpha-transforms")]
fn test_cluster() {
let _compose = DockerCompose::new("example-configs/cassandra-cluster/docker-compose.yml");

let shotover_manager =
ShotoverManager::from_topology_file("example-configs/cassandra-cluster/topology.yaml");

let connection = shotover_manager.cassandra_connection("127.0.0.1", 9042);

keyspace::test(&connection);
table::test(&connection);
udt::test(&connection);
native_types::test(&connection);
collections::test(&connection);
functions::test(&connection);
prepared_statements::test(&connection);
batch_statements::test(&connection);
}

#[test]
#[serial]
fn test_source_tls_and_single_tls() {
Expand Down
3 changes: 2 additions & 1 deletion test-helpers/src/docker_compose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ impl DockerCompose {
| "tests/test-configs/cassandra-peers-rewrite/docker-compose-3.11-cassandra.yaml" => {
self.wait_for_log("Startup complete", 2, 110)
}
"example-configs-docker/cassandra-peers-rewrite/docker-compose.yml" => {
"example-configs-docker/cassandra-peers-rewrite/docker-compose.yml"
| "example-configs/cassandra-cluster/docker-compose.yml" => {
self.wait_for_log("Startup complete", 3, 180)
}
path => unimplemented!(
Expand Down

0 comments on commit f10252d

Please sign in to comment.