From 14ea9e9691c4fd4695f169f279d305ffffad07d6 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 13 Sep 2022 15:01:16 +1000 Subject: [PATCH] CassandraSinkCluster fix routing for system keyspaces (#799) --- .../transforms/cassandra/sink_cluster/mod.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs index d010a1e61..07bb8563e 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs @@ -96,6 +96,7 @@ pub struct CassandraSinkCluster { local_table: FQName, peers_table: FQName, peers_v2_table: FQName, + system_keyspaces: [Identifier; 3], local_shotover_node: ShotoverNode, /// A local clone of topology_task_nodes. /// Internally stores connections to the nodes. @@ -126,6 +127,7 @@ impl Clone for CassandraSinkCluster { local_table: self.local_table.clone(), peers_table: self.peers_table.clone(), peers_v2_table: self.peers_v2_table.clone(), + system_keyspaces: self.system_keyspaces.clone(), local_shotover_node: self.local_shotover_node.clone(), local_nodes: vec![], topology_task_nodes: self.topology_task_nodes.clone(), @@ -171,6 +173,11 @@ impl CassandraSinkCluster { local_table: FQName::new("system", "local"), peers_table: FQName::new("system", "peers"), peers_v2_table: FQName::new("system", "peers_v2"), + system_keyspaces: [ + Identifier::parse("system"), + Identifier::parse("system_schema"), + Identifier::parse("system_distributed"), + ], local_shotover_node, local_nodes: vec![], topology_task_nodes: nodes_shared, @@ -266,7 +273,7 @@ impl CassandraSinkCluster { // system.local and system.peers must be routed to the same node otherwise the system.local node will be amongst the system.peers nodes and a node will be missing // DDL statements and system.local must be routed through the same connection, so that schema_version changes appear immediately in system.local || is_ddl_statement(&mut message) - || self.is_system_local_or_peers(&mut message) + || self.is_system_query(&mut message) { self.init_handshake_connection.as_mut().unwrap() } else if is_use_statement(&mut message) { @@ -715,13 +722,13 @@ impl CassandraSinkCluster { } // TODO: handle use statement state - fn is_system_local_or_peers(&self, request: &mut Message) -> bool { + fn is_system_query(&self, request: &mut Message) -> bool { if let Some(Frame::Cassandra(frame)) = request.frame() { if let CassandraOperation::Query { query, .. } = &mut frame.operation { if let CassandraStatement::Select(select) = query.as_ref() { - return self.local_table == select.table_name - || self.peers_table == select.table_name - || self.peers_v2_table == select.table_name; + if let Some(keyspace) = &select.table_name.keyspace { + return self.system_keyspaces.contains(keyspace); + } } } }