From 17ab71769c1a7a781a4309f174513b04f4b94f81 Mon Sep 17 00:00:00 2001 From: Conor Date: Thu, 13 Oct 2022 11:45:11 +1000 Subject: [PATCH] share prepared metadata between connections (#841) --- .../transforms/cassandra/sink_cluster/mod.rs | 3 +- .../cassandra/sink_cluster/node_pool.rs | 35 +++++++++++++------ 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs index 02020266a..9bbb9fa6d 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs @@ -332,6 +332,7 @@ impl CassandraSinkCluster { match self .pool .replica_node(execute, &metadata.version, &mut self.rng) + .await { Ok(Some(replica_node)) => { replica_node @@ -464,7 +465,7 @@ impl CassandraSinkCluster { for response in responses.iter_mut() { if let Some((id, metadata)) = get_prepared_result_message(response) { - self.pool.add_prepared_result(id, metadata); + self.pool.add_prepared_result(id, metadata).await; } } diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs index 1a351b98f..78664bdbf 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs @@ -9,7 +9,8 @@ use cassandra_protocol::token::Murmur3Token; use cassandra_protocol::types::CBytesShort; use rand::prelude::*; use std::collections::HashMap; -use tokio::sync::watch; +use std::sync::Arc; +use tokio::sync::{watch, RwLock}; pub enum GetReplicaErr { NoMetadata, @@ -18,17 +19,27 @@ pub enum GetReplicaErr { #[derive(Debug)] pub struct NodePool { - prepared_metadata: HashMap, + prepared_metadata: Arc>>, token_map: TokenMap, nodes: Vec, } +impl Clone for NodePool { + fn clone(&self) -> Self { + Self { + prepared_metadata: self.prepared_metadata.clone(), + token_map: TokenMap::new(&[]), + nodes: vec![], + } + } +} + impl NodePool { pub fn new(nodes: Vec) -> Self { Self { token_map: TokenMap::new(nodes.as_slice()), nodes, - prepared_metadata: HashMap::new(), + prepared_metadata: Arc::new(RwLock::new(HashMap::new())), } } @@ -54,8 +65,9 @@ impl NodePool { self.token_map = TokenMap::new(self.nodes.as_slice()); } - pub fn add_prepared_result(&mut self, id: CBytesShort, metadata: PreparedMetadata) { - self.prepared_metadata.insert(id, metadata); + pub async fn add_prepared_result(&mut self, id: CBytesShort, metadata: PreparedMetadata) { + let mut write_lock = self.prepared_metadata.write().await; + write_lock.insert(id, metadata); } pub fn get_random_node_in_dc_rack( @@ -71,16 +83,19 @@ impl NodePool { } /// Get a token routed replica node for the supplied execute message (if exists) - pub fn replica_node( + pub async fn replica_node( &mut self, execute: &BodyReqExecuteOwned, version: &Version, rng: &mut SmallRng, ) -> Result, GetReplicaErr> { - let metadata = self - .prepared_metadata - .get(&execute.id) - .ok_or(GetReplicaErr::NoMetadata)?; + let metadata = { + let read_lock = self.prepared_metadata.read().await; + read_lock + .get(&execute.id) + .ok_or(GetReplicaErr::NoMetadata)? + .clone() + }; let routing_key = calculate_routing_key( &metadata.pk_indexes,