Skip to content

Commit

Permalink
Move NodePool custom clone into a NodePoolBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Feb 8, 2023
1 parent fcc957d commit ba2779c
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 27 deletions.
8 changes: 4 additions & 4 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use self::node_pool::PreparedMetadata;
use self::node_pool::{NodePoolBuilder, PreparedMetadata};
use crate::error::ChainResponse;
use crate::frame::cassandra::{parse_statement_single, CassandraMetadata, Tracing};
use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame};
Expand Down Expand Up @@ -119,7 +119,7 @@ pub struct CassandraSinkClusterBuilder {
nodes_rx: watch::Receiver<Vec<CassandraNode>>,
keyspaces_rx: KeyspaceChanRx,
task_handshake_tx: mpsc::Sender<TaskConnectionInfo>,
pool: NodePool,
pool: NodePoolBuilder,
}

impl CassandraSinkClusterBuilder {
Expand Down Expand Up @@ -160,7 +160,7 @@ impl CassandraSinkClusterBuilder {
nodes_rx: local_nodes_rx,
keyspaces_rx,
task_handshake_tx,
pool: NodePool::new(vec![]),
pool: NodePoolBuilder::new(),
}
}

Expand All @@ -184,7 +184,7 @@ impl CassandraSinkClusterBuilder {
failed_requests: self.failed_requests.clone(),
read_timeout: self.read_timeout,
local_shotover_node: self.local_shotover_node.clone(),
pool: self.pool.clone(),
pool: self.pool.build(),
// Because the self.nodes_rx is always copied from the original nodes_rx created before any node lists were sent,
// once a single node list has been sent all new connections will immediately recognize it as a change.
nodes_rx: self.nodes_rx.clone(),
Expand Down
44 changes: 24 additions & 20 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,38 +31,40 @@ pub struct KeyspaceMetadata {
pub replication_factor: usize,
}

#[derive(Debug)]
pub struct NodePool {
// Values in the builder are shared between transform instances that come from the same transform in the topology.yaml
#[derive(Clone)]
pub struct NodePoolBuilder {
prepared_metadata: Arc<RwLock<HashMap<CBytesShort, PreparedMetadata>>>,
keyspace_metadata: HashMap<String, KeyspaceMetadata>,
token_map: TokenMap,
nodes: Vec<CassandraNode>,
prev_idx: usize,
}

impl Clone for NodePool {
fn clone(&self) -> Self {
impl NodePoolBuilder {
pub fn new() -> Self {
Self {
prepared_metadata: Arc::new(RwLock::new(HashMap::new())),
}
}

pub fn build(&self) -> NodePool {
NodePool {
prepared_metadata: self.prepared_metadata.clone(),
keyspace_metadata: self.keyspace_metadata.clone(),
keyspace_metadata: HashMap::new(),
token_map: TokenMap::new(&[]),
nodes: vec![],
prev_idx: 0,
}
}
}

impl NodePool {
pub fn new(nodes: Vec<CassandraNode>) -> Self {
Self {
token_map: TokenMap::new(nodes.as_slice()),
nodes,
prepared_metadata: Arc::new(RwLock::new(HashMap::new())),
keyspace_metadata: HashMap::new(),
prev_idx: 0,
}
}
#[derive(Debug)]
pub struct NodePool {
prepared_metadata: Arc<RwLock<HashMap<CBytesShort, PreparedMetadata>>>,
keyspace_metadata: HashMap<String, KeyspaceMetadata>,
token_map: TokenMap,
nodes: Vec<CassandraNode>,
prev_idx: usize,
}

impl NodePool {
pub fn nodes(&mut self) -> &mut [CassandraNode] {
&mut self.nodes
}
Expand Down Expand Up @@ -210,7 +212,9 @@ mod test_node_pool {
fn test_round_robin() {
let nodes = prepare_nodes();

let mut node_pool = NodePool::new(nodes.clone());
let mut node_pool = NodePoolBuilder::new().build();
let (_nodes_tx, mut nodes_rx) = watch::channel(nodes.clone());
node_pool.update_nodes(&mut nodes_rx);

node_pool.nodes[1].is_up = false;
node_pool.nodes[3].is_up = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#[cfg(test)]
mod test_token_aware_router {
use super::super::node_pool::{KeyspaceMetadata, NodePool};
use super::super::node_pool::KeyspaceMetadata;
use super::super::routing_key::calculate_routing_key;
use crate::transforms::cassandra::sink_cluster::node::CassandraNode;
use crate::transforms::cassandra::sink_cluster::node_pool::PreparedMetadata;
use crate::transforms::cassandra::sink_cluster::node_pool::{
NodePoolBuilder, PreparedMetadata,
};
use crate::transforms::cassandra::sink_cluster::{KeyspaceChanRx, KeyspaceChanTx};
use cassandra_protocol::consistency::Consistency::One;
use cassandra_protocol::frame::message_execute::BodyReqExecuteOwned;
Expand All @@ -23,7 +25,10 @@ mod test_token_aware_router {
async fn test_router() {
let mut rng = SmallRng::from_rng(rand::thread_rng()).unwrap();

let mut router = NodePool::new(prepare_nodes());
let nodes = prepare_nodes();
let mut router = NodePoolBuilder::new().build();
let (_nodes_tx, mut nodes_rx) = watch::channel(nodes);
router.update_nodes(&mut nodes_rx);

let id = CBytesShort::new(vec![
11, 241, 38, 11, 140, 72, 217, 34, 214, 128, 175, 241, 151, 73, 197, 227,
Expand Down

0 comments on commit ba2779c

Please sign in to comment.