Skip to content

Commit

Permalink
Round Robin load balancing for CassandraSinkCluster (#846)
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros authored Oct 18, 2022
1 parent 4ba02ba commit dcf76d4
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 15 deletions.
13 changes: 6 additions & 7 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ impl CassandraSinkCluster {
.unwrap()
} else {
self.pool
.get_random_node_in_dc_rack(&self.local_shotover_node.rack, &mut self.rng)
.get_round_robin_node_in_dc_rack(&self.local_shotover_node.rack)
.address
};

Expand Down Expand Up @@ -348,10 +348,9 @@ impl CassandraSinkCluster {
.send(message, return_chan_tx)?;
}
Ok(None) => {
let node = self.pool.get_random_node_in_dc_rack(
&self.local_shotover_node.rack,
&mut self.rng,
);
let node = self
.pool
.get_round_robin_node_in_dc_rack(&self.local_shotover_node.rack);
node.get_connection(&self.connection_factory)
.await?
.send(message, return_chan_tx)?;
Expand Down Expand Up @@ -390,7 +389,7 @@ impl CassandraSinkCluster {
} else {
let node = self
.pool
.get_random_node_in_dc_rack(&self.local_shotover_node.rack, &mut self.rng);
.get_round_robin_node_in_dc_rack(&self.local_shotover_node.rack);
node.get_connection(&self.connection_factory)
.await?
.send(message, return_chan_tx)?;
Expand Down Expand Up @@ -499,7 +498,7 @@ impl CassandraSinkCluster {
// Therefore we need to recreate the control connection to ensure that it is in the configured data_center/rack.
let random_address = self
.pool
.get_random_node_in_dc_rack(&self.local_shotover_node.rack, &mut self.rng)
.get_round_robin_node_in_dc_rack(&self.local_shotover_node.rack)
.address;
self.init_handshake_connection = Some(
self.connection_factory
Expand Down
153 changes: 145 additions & 8 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct NodePool {
prepared_metadata: Arc<RwLock<HashMap<CBytesShort, PreparedMetadata>>>,
token_map: TokenMap,
nodes: Vec<CassandraNode>,
prev_idx: usize,
}

impl Clone for NodePool {
Expand All @@ -30,6 +31,7 @@ impl Clone for NodePool {
prepared_metadata: self.prepared_metadata.clone(),
token_map: TokenMap::new(&[]),
nodes: vec![],
prev_idx: 0,
}
}
}
Expand All @@ -40,6 +42,7 @@ impl NodePool {
token_map: TokenMap::new(nodes.as_slice()),
nodes,
prepared_metadata: Arc::new(RwLock::new(HashMap::new())),
prev_idx: 0,
}
}

Expand Down Expand Up @@ -70,15 +73,24 @@ impl NodePool {
write_lock.insert(id, metadata);
}

pub fn get_random_node_in_dc_rack(
&mut self,
rack: &String,
rng: &mut SmallRng,
) -> &mut CassandraNode {
pub fn get_round_robin_node_in_dc_rack(&mut self, rack: &str) -> &mut CassandraNode {
let up_indexes: Vec<usize> = self
.nodes
.iter()
.enumerate()
.filter_map(|(i, node)| {
if node.is_up && node.rack == *rack {
Some(i)
} else {
None
}
})
.collect();

self.prev_idx = (self.prev_idx + 1) % up_indexes.len();

self.nodes
.iter_mut()
.filter(|x| x.rack == *rack && x.is_up)
.choose(rng)
.get_mut(*up_indexes.get(self.prev_idx).unwrap())
.unwrap()
}

Expand Down Expand Up @@ -121,3 +133,128 @@ impl NodePool {
Ok(None)
}
}

#[cfg(test)]
mod test_node_pool {
use super::*;
use crate::transforms::cassandra::sink_cluster::CassandraNode;
use uuid::uuid;

fn prepare_nodes() -> Vec<CassandraNode> {
vec![
CassandraNode::new(
"172.16.1.0:9044".parse().unwrap(),
"rack1".into(),
vec![],
uuid!("2dd022d6-2937-4754-89d6-02d2933a8f7a"),
),
CassandraNode::new(
"172.16.1.1:9044".parse().unwrap(),
"rack1".into(),
vec![],
uuid!("2dd022d6-2937-4754-89d6-02d2933a8f7b"),
),
CassandraNode::new(
"172.16.1.2:9044".parse().unwrap(),
"rack1".into(),
vec![],
uuid!("2dd022d6-2937-4754-89d6-02d2933a8f7c"),
),
CassandraNode::new(
"172.16.1.3:9044".parse().unwrap(),
"rack1".into(),
vec![],
uuid!("2dd022d6-2937-4754-89d6-02d2933a8f7c"),
),
CassandraNode::new(
"172.16.1.4:9044".parse().unwrap(),
"rack1".into(),
vec![],
uuid!("2dd022d6-2937-4754-89d6-02d2933a8f7c"),
),
CassandraNode::new(
"172.16.1.5:9044".parse().unwrap(),
"rack1".into(),
vec![],
uuid!("2dd022d6-2937-4754-89d6-02d2933a8f7d"),
),
CassandraNode::new(
"172.16.1.6:9044".parse().unwrap(),
"rack1".into(),
vec![],
uuid!("2dd022d6-2937-4754-89d6-02d2933a8f7e"),
),
CassandraNode::new(
"172.16.1.7:9044".parse().unwrap(),
"rack1".into(),
vec![],
uuid!("2dd022d6-2937-4754-89d6-02d2933a8f7f"),
),
]
}

#[test]
fn test_round_robin() {
let nodes = prepare_nodes();

let mut node_pool = NodePool::new(nodes.clone());

node_pool.nodes[1].is_up = false;
node_pool.nodes[3].is_up = false;
node_pool.nodes[5].is_up = false;

let mut round_robin_nodes = vec![];

for _ in 0..nodes.len() - 1 {
round_robin_nodes.push(
node_pool
.get_round_robin_node_in_dc_rack("rack1")
.address
.to_string(),
);
}

// only includes up nodes in round robin
assert_eq!(
vec![
"172.16.1.2:9044",
"172.16.1.4:9044",
"172.16.1.6:9044",
"172.16.1.7:9044",
"172.16.1.0:9044",
"172.16.1.2:9044",
"172.16.1.4:9044",
],
round_robin_nodes
);

node_pool.nodes[1].is_up = true;
node_pool.nodes[3].is_up = true;
node_pool.nodes[5].is_up = true;

round_robin_nodes.clear();

for _ in 0..nodes.len() - 1 {
round_robin_nodes.push(
node_pool
.get_round_robin_node_in_dc_rack("rack1")
.address
.to_string(),
);
}

// includes the new up nodes in round robin
assert_eq!(
vec![
"172.16.1.3:9044",
"172.16.1.4:9044",
"172.16.1.5:9044",
"172.16.1.6:9044",
"172.16.1.7:9044",
"172.16.1.0:9044",
"172.16.1.1:9044"
],
round_robin_nodes
);
}
}

0 comments on commit dcf76d4

Please sign in to comment.