From d88886f58e0a8b10bedd72d08d1e638404dffe33 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Fri, 26 Aug 2022 15:12:54 +1000 Subject: [PATCH] CassandraSinkCluster system.peers rewrite (#756) * CassandraSinkCluster system.peers rewrite * Use configure via nodes list instead of peers list --- .../topology_rack1.yaml | 17 +- .../topology_rack2.yaml | 17 +- .../topology_rack3.yaml | 17 +- .../cassandra-cluster-tls/topology.yaml | 8 +- .../topology-dummy-peers.yaml | 29 ++ .../cassandra-cluster/topology.yaml | 10 +- .../transforms/cassandra/sink_cluster/mod.rs | 341 +++++++++++++----- .../tests/cassandra_int_tests/cluster.rs | 80 +++- .../cassandra_int_tests/cluster_multi_rack.rs | 46 ++- .../tests/cassandra_int_tests/mod.rs | 14 + 10 files changed, 462 insertions(+), 117 deletions(-) create mode 100644 shotover-proxy/example-configs/cassandra-cluster/topology-dummy-peers.yaml diff --git a/shotover-proxy/example-configs/cassandra-cluster-multi-rack/topology_rack1.yaml b/shotover-proxy/example-configs/cassandra-cluster-multi-rack/topology_rack1.yaml index 487b7a7c4..62ceffd3b 100644 --- a/shotover-proxy/example-configs/cassandra-cluster-multi-rack/topology_rack1.yaml +++ b/shotover-proxy/example-configs/cassandra-cluster-multi-rack/topology_rack1.yaml @@ -7,8 +7,19 @@ chain_config: main_chain: - CassandraSinkCluster: first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"] - data_center: "dc1" - rack: "rack1" - host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" + local_shotover_host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" + shotover_nodes: + - address: "127.0.0.1:9042" + data_center: "dc1" + rack: "rack1" + host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" + - address: "127.0.0.2:9042" + data_center: "dc1" + rack: "rack2" + host_id: "3c3c4e2d-ba74-4f76-b52e-fb5bcee6a9f4" + - address: "127.0.0.3:9042" + data_center: "dc1" + rack: "rack3" + host_id: "fa74d7ec-1223-472b-97de-04a32ccdb70b" source_to_chain_mapping: cassandra_prod: main_chain diff --git a/shotover-proxy/example-configs/cassandra-cluster-multi-rack/topology_rack2.yaml b/shotover-proxy/example-configs/cassandra-cluster-multi-rack/topology_rack2.yaml index e318ccbac..e783bf0ce 100644 --- a/shotover-proxy/example-configs/cassandra-cluster-multi-rack/topology_rack2.yaml +++ b/shotover-proxy/example-configs/cassandra-cluster-multi-rack/topology_rack2.yaml @@ -7,8 +7,19 @@ chain_config: main_chain: - CassandraSinkCluster: first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"] - data_center: "dc1" - rack: "rack2" - host_id: "3c3c4e2d-ba74-4f76-b52e-fb5bcee6a9f4" + local_shotover_host_id: "3c3c4e2d-ba74-4f76-b52e-fb5bcee6a9f4" + shotover_nodes: + - address: "127.0.0.1:9042" + data_center: "dc1" + rack: "rack1" + host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" + - address: "127.0.0.2:9042" + data_center: "dc1" + rack: "rack2" + host_id: "3c3c4e2d-ba74-4f76-b52e-fb5bcee6a9f4" + - address: "127.0.0.3:9042" + data_center: "dc1" + rack: "rack3" + host_id: "fa74d7ec-1223-472b-97de-04a32ccdb70b" source_to_chain_mapping: cassandra_prod: main_chain diff --git a/shotover-proxy/example-configs/cassandra-cluster-multi-rack/topology_rack3.yaml b/shotover-proxy/example-configs/cassandra-cluster-multi-rack/topology_rack3.yaml index f43c89d9e..d48e99403 100644 --- a/shotover-proxy/example-configs/cassandra-cluster-multi-rack/topology_rack3.yaml +++ b/shotover-proxy/example-configs/cassandra-cluster-multi-rack/topology_rack3.yaml @@ -7,8 +7,19 @@ chain_config: main_chain: - CassandraSinkCluster: first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"] - data_center: "dc1" - rack: "rack3" - host_id: "fa74d7ec-1223-472b-97de-04a32ccdb70b" + local_shotover_host_id: "fa74d7ec-1223-472b-97de-04a32ccdb70b" + shotover_nodes: + - address: "127.0.0.1:9042" + data_center: "dc1" + rack: "rack1" + host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" + - address: "127.0.0.2:9042" + data_center: "dc1" + rack: "rack2" + host_id: "3c3c4e2d-ba74-4f76-b52e-fb5bcee6a9f4" + - address: "127.0.0.3:9042" + data_center: "dc1" + rack: "rack3" + host_id: "fa74d7ec-1223-472b-97de-04a32ccdb70b" source_to_chain_mapping: cassandra_prod: main_chain diff --git a/shotover-proxy/example-configs/cassandra-cluster-tls/topology.yaml b/shotover-proxy/example-configs/cassandra-cluster-tls/topology.yaml index 0af805965..b3d2906f5 100644 --- a/shotover-proxy/example-configs/cassandra-cluster-tls/topology.yaml +++ b/shotover-proxy/example-configs/cassandra-cluster-tls/topology.yaml @@ -11,9 +11,13 @@ chain_config: main_chain: - CassandraSinkCluster: first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"] - data_center: "dc1" - rack: "rack1" host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" + local_shotover_host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" + shotover_nodes: + - address: "127.0.0.1:9042" + data_center: "dc1" + rack: "rack1" + host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" tls: certificate_authority_path: "example-configs/cassandra-tls/certs/localhost_CA.crt" certificate_path: "example-configs/cassandra-tls/certs/localhost.crt" diff --git a/shotover-proxy/example-configs/cassandra-cluster/topology-dummy-peers.yaml b/shotover-proxy/example-configs/cassandra-cluster/topology-dummy-peers.yaml new file mode 100644 index 000000000..14dfdaa7b --- /dev/null +++ b/shotover-proxy/example-configs/cassandra-cluster/topology-dummy-peers.yaml @@ -0,0 +1,29 @@ +--- +sources: + cassandra_prod: + Cassandra: + listen_addr: "127.0.0.1:9042" +chain_config: + main_chain: + - CassandraSinkCluster: + first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"] + local_shotover_host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" + shotover_nodes: + - address: "127.0.0.1:9042" + data_center: "dc1" + rack: "rack1" + host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" + # These extra nodes dont really make sense, its pointing at the same address as the local shotover node. + # It is however useful for testing the functionality of the system.peers rewriting. + # We can make stronger assertions against the values returned by system.peers with this config because + # more system.peers fields are static due to always being queried against this one shotover instance. + - address: "127.0.0.1:9042" + data_center: "dc1" + rack: "rack1" + host_id: "3c3c4e2d-ba74-4f76-b52e-fb5bcee6a9f4" + - address: "127.0.0.1:9042" + data_center: "dc1" + rack: "rack1" + host_id: "fa74d7ec-1223-472b-97de-04a32ccdb70b" +source_to_chain_mapping: + cassandra_prod: main_chain diff --git a/shotover-proxy/example-configs/cassandra-cluster/topology.yaml b/shotover-proxy/example-configs/cassandra-cluster/topology.yaml index 487b7a7c4..9dc021951 100644 --- a/shotover-proxy/example-configs/cassandra-cluster/topology.yaml +++ b/shotover-proxy/example-configs/cassandra-cluster/topology.yaml @@ -7,8 +7,12 @@ chain_config: main_chain: - CassandraSinkCluster: first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"] - data_center: "dc1" - rack: "rack1" - host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" + local_shotover_host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" + shotover_nodes: + - address: "127.0.0.1:9042" + data_center: "dc1" + rack: "rack1" + host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" + source_to_chain_mapping: cassandra_prod: main_chain diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs index 2bc895cc6..f7569e791 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs @@ -2,7 +2,7 @@ use crate::codec::cassandra::CassandraCodec; use crate::error::ChainResponse; use crate::frame::cassandra::parse_statement_single; use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame}; -use crate::message::{Message, MessageValue, Messages}; +use crate::message::{IntSize, Message, MessageValue, Messages}; use crate::tls::{TlsConnector, TlsConnectorConfig}; use crate::transforms::cassandra::connection::CassandraConnection; use crate::transforms::util::Response; @@ -36,9 +36,8 @@ pub struct CassandraSinkClusterConfig { /// If this is not followed, shotover's invariants will still be upheld but shotover will communicate with a /// node outside of the specified data_center and rack. pub first_contact_points: Vec, - pub data_center: String, - pub rack: String, - pub host_id: Uuid, + pub local_shotover_host_id: Uuid, + pub shotover_nodes: Vec, pub tls: Option, pub read_timeout: Option, } @@ -46,20 +45,41 @@ pub struct CassandraSinkClusterConfig { impl CassandraSinkClusterConfig { pub async fn get_transform(&self, chain_name: String) -> Result { let tls = self.tls.clone().map(TlsConnector::new).transpose()?; + let mut shotover_nodes = self.shotover_nodes.clone(); + let index = self + .shotover_nodes + .iter() + .position(|x| x.host_id == self.local_shotover_host_id) + .ok_or_else(|| { + anyhow!( + "local host_id {} was missing in shotover_nodes", + self.local_shotover_host_id + ) + })?; + let local_node = shotover_nodes.remove(index); + Ok(Transforms::CassandraSinkCluster(CassandraSinkCluster::new( self.first_contact_points.clone(), + shotover_nodes, chain_name, - self.data_center.clone(), - self.rack.clone(), - self.host_id, + local_node, tls, self.read_timeout, ))) } } +#[derive(Deserialize, Debug, Clone)] +pub struct ShotoverNode { + pub address: SocketAddr, + pub data_center: String, + pub rack: String, + pub host_id: Uuid, +} + pub struct CassandraSinkCluster { contact_points: Vec, + shotover_peers: Vec, init_handshake_connection: Option, init_handshake: Vec, init_handshake_address: Option, @@ -71,10 +91,9 @@ pub struct CassandraSinkCluster { pushed_messages_tx: Option>, read_timeout: Option, local_table: FQName, - peer_table: FQName, - data_center: String, - rack: String, - host_id: Uuid, + peers_table: FQName, + peers_v2_table: FQName, + local_shotover_node: ShotoverNode, /// A local clone of topology_task_nodes /// Internally stores connections to the nodes local_nodes: Vec, @@ -89,6 +108,7 @@ impl Clone for CassandraSinkCluster { fn clone(&self) -> Self { CassandraSinkCluster { contact_points: self.contact_points.clone(), + shotover_peers: self.shotover_peers.clone(), init_handshake_connection: None, init_handshake: vec![], init_handshake_address: None, @@ -100,10 +120,9 @@ impl Clone for CassandraSinkCluster { pushed_messages_tx: None, read_timeout: self.read_timeout, local_table: self.local_table.clone(), - peer_table: self.peer_table.clone(), - data_center: self.data_center.clone(), - rack: self.rack.clone(), - host_id: self.host_id, + peers_table: self.peers_table.clone(), + peers_v2_table: self.peers_v2_table.clone(), + local_shotover_node: self.local_shotover_node.clone(), local_nodes: vec![], topology_task_nodes: self.topology_task_nodes.clone(), rng: SmallRng::from_rng(rand::thread_rng()).unwrap(), @@ -113,12 +132,12 @@ impl Clone for CassandraSinkCluster { } impl CassandraSinkCluster { + #[allow(clippy::too_many_arguments)] pub fn new( contact_points: Vec, + shotover_peers: Vec, chain_name: String, - data_center: String, - rack: String, - host_id: Uuid, + local_shotover_node: ShotoverNode, tls: Option, timeout: Option, ) -> CassandraSinkCluster { @@ -132,11 +151,12 @@ impl CassandraSinkCluster { tls.clone(), nodes_shared.clone(), task_handshake_rx, - data_center.clone(), + local_shotover_node.data_center.clone(), ); CassandraSinkCluster { contact_points, + shotover_peers, init_handshake_connection: None, init_handshake: vec![], init_handshake_address: None, @@ -148,10 +168,9 @@ impl CassandraSinkCluster { pushed_messages_tx: None, read_timeout: receive_timeout, local_table: FQName::new("system", "local"), - peer_table: FQName::new("system", "peers"), - data_center, - rack, - host_id, + peers_table: FQName::new("system", "peers"), + peers_v2_table: FQName::new("system", "peers_v2"), + local_shotover_node, local_nodes: vec![], topology_task_nodes: nodes_shared, rng: SmallRng::from_rng(rand::thread_rng()).unwrap(), @@ -160,12 +179,22 @@ impl CassandraSinkCluster { } } +fn create_query(messages: &Messages, query: &str, version: Version) -> Result { + let stream_id = get_unused_stream_id(messages)?; + Ok(Message::from_frame(Frame::Cassandra(CassandraFrame { + version, + stream_id, + tracing_id: None, + warnings: vec![], + operation: CassandraOperation::Query { + query: Box::new(parse_statement_single(query)), + params: Box::new(QueryParams::default()), + }, + }))) +} + impl CassandraSinkCluster { - async fn send_message( - &mut self, - mut messages: Messages, - local_addr: SocketAddr, - ) -> ChainResponse { + async fn send_message(&mut self, mut messages: Messages) -> ChainResponse { // Attempt to populate nodes list if we still dont have one yet if self.local_nodes.is_empty() { let nodes_shared = self.topology_task_nodes.read().await; @@ -179,23 +208,16 @@ impl CassandraSinkCluster { .collect(); for table_to_rewrite in tables_to_rewrite.iter().rev() { - let stream_id = get_unused_stream_id(&messages)?; - - if let RewriteTableTy::Local = table_to_rewrite.ty { + let query = "SELECT rack, data_center, schema_version, tokens, release_version FROM system.peers"; + messages.insert( + table_to_rewrite.index + 1, + create_query(&messages, query, table_to_rewrite.version)?, + ); + if let RewriteTableTy::Peers = table_to_rewrite.ty { + let query = "SELECT rack, data_center, schema_version, tokens, release_version FROM system.local"; messages.insert( - table_to_rewrite.index+1, - Message::from_frame(Frame::Cassandra(CassandraFrame { - version: table_to_rewrite.version, - stream_id, - tracing_id: None, - warnings: vec![], - operation: CassandraOperation::Query { - query: Box::new(parse_statement_single( - "SELECT rack, data_center, schema_version, tokens, release_version FROM system.peers", - )), - params: Box::new(QueryParams::default()), - }, - })), + table_to_rewrite.index + 2, + create_query(&messages, query, table_to_rewrite.version)?, ); } } @@ -327,8 +349,7 @@ impl CassandraSinkCluster { } for table_to_rewrite in tables_to_rewrite { - self.rewrite_table(table_to_rewrite, local_addr, &mut responses) - .await?; + self.rewrite_table(table_to_rewrite, &mut responses).await?; } Ok(responses) @@ -341,7 +362,10 @@ impl CassandraSinkCluster { if let CassandraStatement::Select(select) = query.as_ref() { let ty = if self.local_table == select.table_name { RewriteTableTy::Local - } else if self.peer_table == select.table_name { + } else if self.peers_table == select.table_name + || self.peers_v2_table == select.table_name + { + // TODO: fail if WHERE exists RewriteTableTy::Peers } else { return None; @@ -362,24 +386,30 @@ impl CassandraSinkCluster { async fn rewrite_table( &mut self, table: TableToRewrite, - local_addr: SocketAddr, responses: &mut Vec, ) -> Result<()> { - match table.ty { - RewriteTableTy::Local => { - if table.index + 1 < responses.len() { - let peers_response = responses.remove(table.index + 1); + if table.index + 1 < responses.len() { + let peers_response = responses.remove(table.index + 1); + match table.ty { + RewriteTableTy::Local => { if let Some(local_response) = responses.get_mut(table.index) { - self.rewrite_table_local(table, local_addr, local_response, peers_response) + self.rewrite_table_local(table, local_response, peers_response) .await?; local_response.invalidate_cache(); } } - } - RewriteTableTy::Peers => { - if let Some(peers_response) = responses.get_mut(table.index) { - self.rewrite_table_peers(peers_response).await?; - peers_response.invalidate_cache(); + RewriteTableTy::Peers => { + if table.index + 1 < responses.len() { + let local_response = responses.remove(table.index + 1); + if let Some(client_peers_response) = responses.get_mut(table.index) { + let mut nodes = parse_system_nodes(peers_response)?; + nodes.extend(parse_system_nodes(local_response)?); + + self.rewrite_table_peers(table, client_peers_response, nodes) + .await?; + client_peers_response.invalidate_cache(); + } + } } } } @@ -387,16 +417,136 @@ impl CassandraSinkCluster { Ok(()) } - async fn rewrite_table_peers(&mut self, peers_response: &mut Message) -> Result<()> { - // TODO: generate rows for shotover peers - // the current implementation will at least direct all traffic through shotover + async fn rewrite_table_peers( + &mut self, + table: TableToRewrite, + peers_response: &mut Message, + nodes: Vec, + ) -> Result<()> { + let mut data_center_alias = "data_center"; + let mut rack_alias = "rack"; + let mut host_id_alias = "host_id"; + let mut preferred_ip_alias = "preferred_ip"; + let mut preferred_port_alias = "preferred_port"; + let mut rpc_address_alias = "rpc_address"; + let mut peer_alias = "peer"; + let mut peer_port_alias = "peer_port"; + let mut release_version_alias = "release_version"; + let mut tokens_alias = "tokens"; + let mut schema_version_alias = "schema_version"; + for select in &table.selects { + if let SelectElement::Column(column) = select { + if let Some(alias) = &column.alias { + let alias = match alias { + Identifier::Unquoted(alias) => alias, + Identifier::Quoted(alias) => alias, + }; + if column.name == Identifier::Unquoted("data_center".to_string()) { + data_center_alias = alias; + } else if column.name == Identifier::Unquoted("rack".to_string()) { + rack_alias = alias; + } else if column.name == Identifier::Unquoted("host_id".to_string()) { + host_id_alias = alias; + } else if column.name == Identifier::Unquoted("preferred_ip".to_string()) { + preferred_ip_alias = alias; + } else if column.name == Identifier::Unquoted("preferred_port".to_string()) { + preferred_port_alias = alias; + } else if column.name == Identifier::Unquoted("rpc_address".to_string()) { + rpc_address_alias = alias; + } else if column.name == Identifier::Unquoted("peer".to_string()) { + peer_alias = alias; + } else if column.name == Identifier::Unquoted("peer_port".to_string()) { + peer_port_alias = alias; + } else if column.name == Identifier::Unquoted("release_version".to_string()) { + release_version_alias = alias; + } else if column.name == Identifier::Unquoted("tokens".to_string()) { + tokens_alias = alias; + } else if column.name == Identifier::Unquoted("schema_version".to_string()) { + schema_version_alias = alias; + } + } + } + } + if let Some(Frame::Cassandra(frame)) = peers_response.frame() { if let CassandraOperation::Result(CassandraResult::Rows { value: MessageValue::Rows(rows), - .. + metadata, }) = &mut frame.operation { - rows.clear(); + *rows = self + .shotover_peers + .iter() + .map(|shotover_peer| { + let mut release_version = "".to_string(); + let mut schema_version = None; + let mut tokens = vec![]; + for node in &nodes { + if node.data_center == shotover_peer.data_center + && node.rack == shotover_peer.rack + { + if release_version.is_empty() { + release_version = node.release_version.clone(); + } + if let Ok(Cmp::Lt) = version_compare::compare( + &node.release_version, + &release_version, + ) { + release_version = node.release_version.clone(); + } + + match &mut schema_version { + Some(schema_version) => { + if &node.schema_version != schema_version { + *schema_version = Uuid::new_v4(); + } + } + None => schema_version = Some(node.schema_version), + } + tokens.extend(node.tokens.iter().cloned()); + } + } + tokens.sort(); + + metadata + .col_specs + .iter() + .map(|colspec| { + if colspec.name == data_center_alias { + MessageValue::Varchar(shotover_peer.data_center.clone()) + } else if colspec.name == rack_alias { + MessageValue::Varchar(shotover_peer.rack.clone()) + } else if colspec.name == host_id_alias { + MessageValue::Uuid(shotover_peer.host_id) + } else if colspec.name == preferred_ip_alias + || colspec.name == preferred_port_alias + || colspec.name == rpc_address_alias + { + MessageValue::Null + } else if colspec.name == peer_alias { + MessageValue::Inet(shotover_peer.address.ip()) + } else if colspec.name == peer_port_alias { + MessageValue::Integer( + shotover_peer.address.port() as i64, + IntSize::I32, + ) + } else if colspec.name == release_version_alias { + MessageValue::Varchar(release_version.clone()) + } else if colspec.name == tokens_alias { + MessageValue::List(tokens.clone()) + } else if colspec.name == schema_version_alias { + MessageValue::Uuid(schema_version.unwrap_or_else(Uuid::new_v4)) + } else { + tracing::warn!( + "Unknown column name in system.peers/system.peers_v2: {}", + colspec.name + ); + MessageValue::Null + } + }) + .collect() + }) + .collect(); } Ok(()) } else { @@ -410,11 +560,14 @@ impl CassandraSinkCluster { async fn rewrite_table_local( &mut self, table: TableToRewrite, - local_address: SocketAddr, local_response: &mut Message, peers_response: Message, ) -> Result<()> { - let peers = parse_system_peers(peers_response, &self.data_center, &self.rack)?; + let mut peers = parse_system_nodes(peers_response)?; + peers.retain(|node| { + node.data_center == self.local_shotover_node.data_center + && node.rack == self.local_shotover_node.rack + }); let mut release_version_alias = "release_version"; let mut tokens_alias = "tokens"; @@ -466,17 +619,18 @@ impl CassandraSinkCluster { for (col, col_meta) in row.iter_mut().zip(metadata.col_specs.iter()) { if col_meta.name == rack_alias { if let MessageValue::Varchar(rack) = col { - is_in_rack = rack == &self.rack; + is_in_rack = rack == &self.local_shotover_node.rack; if !is_in_rack { - *rack = self.rack.clone(); + *rack = self.local_shotover_node.rack.clone(); tracing::warn!("A contact point node is not in the configured rack, this node will receive traffic from outside of its rack"); } } } else if col_meta.name == data_center_alias { if let MessageValue::Varchar(data_center) = col { - is_in_data_center = data_center == &self.data_center; + is_in_data_center = + data_center == &self.local_shotover_node.data_center; if !is_in_data_center { - *data_center = self.data_center.clone(); + *data_center = self.local_shotover_node.data_center.clone(); tracing::warn!("A contact point node is not in the configured data_center, this node will receive traffic from outside of its data_center"); } } @@ -530,11 +684,11 @@ impl CassandraSinkCluster { || col_meta.name == listen_address_alias { if let MessageValue::Inet(address) = col { - *address = local_address.ip(); + *address = self.local_shotover_node.address.ip(); } } else if col_meta.name == host_id_alias { if let MessageValue::Uuid(host_id) = col { - *host_id = self.host_id; + *host_id = self.local_shotover_node.host_id; } } } @@ -555,7 +709,8 @@ impl CassandraSinkCluster { if let CassandraOperation::Query { query, .. } = &mut frame.operation { if let CassandraStatement::Select(select) = query.as_ref() { return self.local_table == select.table_name - || self.peer_table == select.table_name; + || self.peers_table == select.table_name + || self.peers_v2_table == select.table_name; } } } @@ -800,17 +955,15 @@ fn get_unused_stream_id(messages: &Messages) -> Result { Err(anyhow!("Ran out of stream ids")) } -struct SystemPeer { +struct NodeInfo { tokens: Vec, schema_version: Uuid, release_version: String, + rack: String, + data_center: String, } -fn parse_system_peers( - mut response: Message, - config_data_center: &str, - config_rack: &str, -) -> Result> { +fn parse_system_nodes(mut response: Message) -> Result> { if let Some(Frame::Cassandra(frame)) = response.frame() { match &mut frame.operation { CassandraOperation::Result(CassandraResult::Rows { @@ -818,17 +971,6 @@ fn parse_system_peers( .. }) => rows .iter_mut() - .filter(|row| { - if let ( - Some(MessageValue::Varchar(data_center)), - Some(MessageValue::Varchar(rack)), - ) = (row.get(1), row.get(0)) - { - data_center == config_data_center && rack == config_rack - } else { - false - } - }) .map(|row| { if row.len() != 5 { return Err(anyhow!("expected 5 columns but was {}", row.len())); @@ -837,7 +979,7 @@ fn parse_system_peers( let release_version = if let Some(MessageValue::Varchar(value)) = row.pop() { value } else { - return Err(anyhow!("release_version not a list")); + return Err(anyhow!("release_version not a varchar")); }; let tokens = if let Some(MessageValue::List(value)) = row.pop() { @@ -852,13 +994,23 @@ fn parse_system_peers( return Err(anyhow!("schema_version not a uuid")); }; - let _data_center = row.pop(); - let _rack = row.pop(); + let data_center = if let Some(MessageValue::Varchar(value)) = row.pop() { + value + } else { + return Err(anyhow!("data_center not a varchar")); + }; + let rack = if let Some(MessageValue::Varchar(value)) = row.pop() { + value + } else { + return Err(anyhow!("rack not a varchar")); + }; - Ok(SystemPeer { + Ok(NodeInfo { tokens, schema_version, release_version, + data_center, + rack, }) }) .collect(), @@ -878,8 +1030,7 @@ fn parse_system_peers( #[async_trait] impl Transform for CassandraSinkCluster { async fn transform<'a>(&'a mut self, message_wrapper: Wrapper<'a>) -> ChainResponse { - self.send_message(message_wrapper.messages, message_wrapper.local_addr) - .await + self.send_message(message_wrapper.messages).await } fn is_terminating(&self) -> bool { diff --git a/shotover-proxy/tests/cassandra_int_tests/cluster.rs b/shotover-proxy/tests/cassandra_int_tests/cluster.rs index cc4fd3022..751faf419 100644 --- a/shotover-proxy/tests/cassandra_int_tests/cluster.rs +++ b/shotover-proxy/tests/cassandra_int_tests/cluster.rs @@ -9,11 +9,77 @@ use std::net::IpAddr; use std::sync::Arc; use tokio::sync::{mpsc, RwLock}; -async fn test_rewrite_system_local(connection: &CassandraConnection) { +async fn test_rewrite_system_peers(connection: &CassandraConnection) { + let all_columns = "peer, data_center, host_id, preferred_ip, rack, release_version, rpc_address, schema_version, tokens"; assert_query_result(connection, "SELECT * FROM system.peers;", &[]).await; - assert_query_result(connection, "SELECT peer FROM system.peers;", &[]).await; - assert_query_result(connection, "SELECT peer, peer FROM system.peers;", &[]).await; + assert_query_result( + connection, + &format!("SELECT {all_columns} FROM system.peers;"), + &[], + ) + .await; + assert_query_result( + connection, + &format!("SELECT {all_columns}, {all_columns} FROM system.peers;"), + &[], + ) + .await; +} +async fn test_rewrite_system_peers_dummy_peers(connection: &CassandraConnection) { + let star_results1 = [ + ResultValue::Inet("127.0.0.1".parse().unwrap()), + ResultValue::Varchar("dc1".into()), + ResultValue::Uuid("3c3c4e2d-ba74-4f76-b52e-fb5bcee6a9f4".parse().unwrap()), + ResultValue::Inet("255.255.255.255".into()), + ResultValue::Varchar("rack1".into()), + ResultValue::Varchar("3.11.13".into()), + ResultValue::Inet("255.255.255.255".into()), + // schema_version is non deterministic so we cant assert on it. + ResultValue::Any, + // Unfortunately token generation appears to be non-deterministic but we can at least assert that + // there are 128 tokens per node + ResultValue::Set(std::iter::repeat(ResultValue::Any).take(3 * 128).collect()), + ]; + let star_results2 = [ + ResultValue::Inet("127.0.0.1".parse().unwrap()), + ResultValue::Varchar("dc1".into()), + ResultValue::Uuid("fa74d7ec-1223-472b-97de-04a32ccdb70b".parse().unwrap()), + ResultValue::Inet("255.255.255.255".into()), + ResultValue::Varchar("rack1".into()), + ResultValue::Varchar("3.11.13".into()), + ResultValue::Inet("255.255.255.255".into()), + // schema_version is non deterministic so we cant assert on it. + ResultValue::Any, + // Unfortunately token generation appears to be non-deterministic but we can at least assert that + // there are 128 tokens per node + ResultValue::Set(std::iter::repeat(ResultValue::Any).take(3 * 128).collect()), + ]; + + let all_columns = "peer, data_center, host_id, preferred_ip, rack, release_version, rpc_address, schema_version, tokens"; + assert_query_result( + connection, + "SELECT * FROM system.peers;", + &[&star_results1, &star_results2], + ) + .await; + assert_query_result( + connection, + &format!("SELECT {all_columns} FROM system.peers;"), + &[&star_results1, &star_results2], + ) + .await; + assert_query_result( + connection, + &format!("SELECT {all_columns}, {all_columns} FROM system.peers;"), + &[ + &[star_results1.as_slice(), star_results1.as_slice()].concat(), + &[star_results2.as_slice(), star_results2.as_slice()].concat(), + ], + ) + .await; +} +async fn test_rewrite_system_local(connection: &CassandraConnection) { let star_results = [ ResultValue::Varchar("local".into()), ResultValue::Varchar("COMPLETED".into()), @@ -62,6 +128,12 @@ async fn test_rewrite_system_local(connection: &CassandraConnection) { pub async fn test(connection: &CassandraConnection) { test_rewrite_system_local(connection).await; + test_rewrite_system_peers(connection).await; +} + +pub async fn test_dummy_peers(connection: &CassandraConnection) { + test_rewrite_system_local(connection).await; + test_rewrite_system_peers_dummy_peers(connection).await; } pub async fn test_topology_task(ca_path: Option<&str>) { @@ -84,9 +156,7 @@ pub async fn test_topology_task(ca_path: Option<&str>) { assert_eq!(node._tokens.len(), 128); } } - pub async fn run_topology_task(ca_path: Option<&str>) -> Vec { - // Directly test the internal topology task let nodes_shared = Arc::new(RwLock::new(vec![])); let (task_handshake_tx, task_handshake_rx) = mpsc::channel(1); let tls = ca_path.map(|ca_path| { diff --git a/shotover-proxy/tests/cassandra_int_tests/cluster_multi_rack.rs b/shotover-proxy/tests/cassandra_int_tests/cluster_multi_rack.rs index 63c968be1..3f02e826c 100644 --- a/shotover-proxy/tests/cassandra_int_tests/cluster_multi_rack.rs +++ b/shotover-proxy/tests/cassandra_int_tests/cluster_multi_rack.rs @@ -3,18 +3,58 @@ use crate::helpers::cassandra::{assert_query_result, CassandraConnection, Result use std::net::IpAddr; async fn test_rewrite_system_peers(connection: &CassandraConnection) { + let star_results1 = [ + // peer is non-determistic because we dont know which node this will be + ResultValue::Any, + ResultValue::Varchar("dc1".into()), + // host_id is non-determistic because we dont know which node this will be + ResultValue::Any, + ResultValue::Inet("255.255.255.255".into()), + // rack is non-determistic because we dont know which node this will be + ResultValue::Any, + ResultValue::Varchar("3.11.13".into()), + ResultValue::Inet("255.255.255.255".into()), + // schema_version is non deterministic so we cant assert on it. + ResultValue::Any, + // Unfortunately token generation appears to be non-deterministic but we can at least assert that + // there are 128 tokens per node + ResultValue::Set(std::iter::repeat(ResultValue::Any).take(128).collect()), + ]; + let star_results2 = [ + ResultValue::Any, + ResultValue::Varchar("dc1".into()), + ResultValue::Any, + ResultValue::Inet("255.255.255.255".into()), + ResultValue::Any, + ResultValue::Varchar("3.11.13".into()), + ResultValue::Inet("255.255.255.255".into()), + // schema_version is non deterministic so we cant assert on it. + ResultValue::Any, + // Unfortunately token generation appears to be non-deterministic but we can at least assert that + // there are 128 tokens per node + ResultValue::Set(std::iter::repeat(ResultValue::Any).take(128).collect()), + ]; + let all_columns = "peer, data_center, host_id, preferred_ip, rack, release_version, rpc_address, schema_version, tokens"; - assert_query_result(connection, "SELECT * FROM system.peers;", &[]).await; + assert_query_result( + connection, + "SELECT * FROM system.peers;", + &[&star_results1, &star_results2], + ) + .await; assert_query_result( connection, &format!("SELECT {all_columns} FROM system.peers;"), - &[], + &[&star_results1, &star_results2], ) .await; assert_query_result( connection, &format!("SELECT {all_columns}, {all_columns} FROM system.peers;"), - &[], + &[ + &[star_results1.as_slice(), star_results1.as_slice()].concat(), + &[star_results2.as_slice(), star_results2.as_slice()].concat(), + ], ) .await; } diff --git a/shotover-proxy/tests/cassandra_int_tests/mod.rs b/shotover-proxy/tests/cassandra_int_tests/mod.rs index e3e9bbb56..30dd33f57 100644 --- a/shotover-proxy/tests/cassandra_int_tests/mod.rs +++ b/shotover-proxy/tests/cassandra_int_tests/mod.rs @@ -124,6 +124,20 @@ async fn test_cluster_single_rack() { native_types::test(&connection2).await; } + { + let shotover_manager = ShotoverManager::from_topology_file( + "example-configs/cassandra-cluster/topology-dummy-peers.yaml", + ); + + let mut connection = shotover_manager + .cassandra_connection("127.0.0.1", 9042) + .await; + connection + .enable_schema_awaiter("172.16.1.2:9042", None) + .await; + cluster::test_dummy_peers(&connection).await; + } + cluster::test_topology_task(None).await; }