Skip to content

Commit

Permalink
Add counter for when CassandraSinkCluster routes execute message outs…
Browse files Browse the repository at this point in the history
…ide of rack (#1012)
  • Loading branch information
rukai authored Feb 9, 2023
1 parent 26eea8c commit 3a5f4b4
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 13 deletions.
4 changes: 2 additions & 2 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl CassandraSinkClusterBuilder {
connect_timeout_ms: u64,
timeout: Option<u64>,
) -> Self {
let failed_requests = register_counter!("failed_requests", "chain" => chain_name, "transform" => "CassandraSinkCluster");
let failed_requests = register_counter!("failed_requests", "chain" => chain_name.clone(), "transform" => "CassandraSinkCluster");
let receive_timeout = timeout.map(Duration::from_secs);
let connect_timeout = Duration::from_millis(connect_timeout_ms);

Expand All @@ -160,7 +160,7 @@ impl CassandraSinkClusterBuilder {
nodes_rx: local_nodes_rx,
keyspaces_rx,
task_handshake_tx,
pool: NodePoolBuilder::new(),
pool: NodePoolBuilder::new(chain_name),
}
}

Expand Down
25 changes: 16 additions & 9 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use cassandra_protocol::frame::message_execute::BodyReqExecuteOwned;
use cassandra_protocol::frame::Version;
use cassandra_protocol::token::Murmur3Token;
use cassandra_protocol::types::CBytesShort;
use metrics::{register_counter, Counter};
use rand::prelude::*;
use split_iter::Splittable;
use std::sync::Arc;
Expand Down Expand Up @@ -35,12 +36,14 @@ pub struct KeyspaceMetadata {
#[derive(Clone)]
pub struct NodePoolBuilder {
prepared_metadata: Arc<RwLock<HashMap<CBytesShort, PreparedMetadata>>>,
out_of_rack_requests: Counter,
}

impl NodePoolBuilder {
pub fn new() -> Self {
pub fn new(chain_name: String) -> Self {
Self {
prepared_metadata: Arc::new(RwLock::new(HashMap::new())),
out_of_rack_requests: register_counter!("out_of_rack_requests", "chain" => chain_name, "transform" => "CassandraSinkCluster"),
}
}

Expand All @@ -51,17 +54,18 @@ impl NodePoolBuilder {
token_map: TokenMap::new(&[]),
nodes: vec![],
prev_idx: 0,
out_of_rack_requests: self.out_of_rack_requests.clone(),
}
}
}

#[derive(Debug)]
pub struct NodePool {
prepared_metadata: Arc<RwLock<HashMap<CBytesShort, PreparedMetadata>>>,
keyspace_metadata: HashMap<String, KeyspaceMetadata>,
token_map: TokenMap,
nodes: Vec<CassandraNode>,
prev_idx: usize,
out_of_rack_requests: Counter,
}

impl NodePool {
Expand Down Expand Up @@ -138,9 +142,7 @@ impl NodePool {

self.prev_idx = (self.prev_idx + 1) % up_indexes.len();

self.nodes
.get_mut(*up_indexes.get(self.prev_idx).unwrap())
.unwrap()
&mut self.nodes[up_indexes[self.prev_idx]]
}

/// Get a token routed replica node for the supplied execute message (if exists)
Expand Down Expand Up @@ -195,10 +197,15 @@ impl NodePool {
.split(|node| node.rack == rack);

if let Some(rack_replica) = rack_replicas.choose(rng) {
return Ok(Some(rack_replica));
Ok(Some(rack_replica))
} else {
// An execute message is being delivered outside of CassandraSinkCluster's designated rack. The only cases this can occur is when:
// The client correctly routes to the shotover node that reports it has the token in its rack, however the destination cassandra node has since gone down and is now inaccessible.
// or
// The clients token aware routing is broken.
self.out_of_rack_requests.increment(1);
Ok(dc_replicas.choose(rng))
}

Ok(dc_replicas.choose(rng))
}
}

Expand All @@ -212,7 +219,7 @@ mod test_node_pool {
fn test_round_robin() {
let nodes = prepare_nodes();

let mut node_pool = NodePoolBuilder::new().build();
let mut node_pool = NodePoolBuilder::new("chain".to_owned()).build();
let (_nodes_tx, mut nodes_rx) = watch::channel(nodes.clone());
node_pool.update_nodes(&mut nodes_rx);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ mod test_token_aware_router {
let mut rng = SmallRng::from_rng(rand::thread_rng()).unwrap();

let nodes = prepare_nodes();
let mut router = NodePoolBuilder::new().build();
let mut router = NodePoolBuilder::new("chain".to_owned()).build();
let (_nodes_tx, mut nodes_rx) = watch::channel(nodes);
router.update_nodes(&mut nodes_rx);

Expand Down
11 changes: 10 additions & 1 deletion shotover-proxy/tests/cassandra_int_tests/cluster/multi_rack.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::cassandra_int_tests::cluster::run_topology_task;
use std::net::SocketAddr;
use test_helpers::connection::cassandra::{assert_query_result, CassandraConnection, ResultValue};
use test_helpers::{
connection::cassandra::{assert_query_result, CassandraConnection, ResultValue},
metrics::get_metrics_value,
};

async fn test_rewrite_system_peers(connection: &CassandraConnection) {
let star_results = [
Expand Down Expand Up @@ -151,6 +154,12 @@ pub async fn test(connection: &CassandraConnection) {
test_rewrite_system_local(connection).await;
test_rewrite_system_peers(connection).await;
test_rewrite_system_peers_v2(connection).await;

let out_of_rack_request = get_metrics_value(
"out_of_rack_requests{chain=\"main_chain\",transform=\"CassandraSinkCluster\"}",
)
.await;
assert_eq!(out_of_rack_request, "0");
}

pub async fn test_topology_task(ca_path: Option<&str>) {
Expand Down

0 comments on commit 3a5f4b4

Please sign in to comment.