From d19e190a2a0e4c25875a2d0655438e147f4aed23 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 16 Aug 2022 18:15:19 +1000 Subject: [PATCH] CassandraSinkCluster query system.local for local node info (#735) --- .../src/transforms/cassandra/sink_cluster.rs | 179 ++++++++++-------- .../tests/cassandra_int_tests/cluster.rs | 2 +- 2 files changed, 101 insertions(+), 80 deletions(-) diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster.rs index 97e091141..262a4e4fc 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster.rs @@ -10,11 +10,10 @@ use crate::transforms::util::Response; use crate::transforms::{Transform, Transforms, Wrapper}; use anyhow::{anyhow, Result}; use async_trait::async_trait; -use cassandra_protocol::consistency::Consistency; use cassandra_protocol::frame::Version; use cassandra_protocol::query::QueryParams; use cql3_parser::cassandra_statement::CassandraStatement; -use cql3_parser::common::{FQName, Identifier}; +use cql3_parser::common::FQName; use futures::StreamExt; use metrics::{register_counter, Counter}; use rand::prelude::*; @@ -262,8 +261,15 @@ pub fn create_topology_task( ) { tokio::spawn(async move { while let Some(handshake) = handshake_rx.recv().await { - if let Err(err) = topology_task_process(&tls, &nodes, handshake, &data_center).await { - tracing::error!("{err:?}"); + let mut attempts = 0; + while let Err(err) = topology_task_process(&tls, &nodes, &handshake, &data_center).await + { + tracing::error!("topology task failed, retrying, error was: {err:?}"); + attempts += 1; + if attempts > 3 { + // 3 attempts have failed, lets try a new handshake + break; + } } // Sleep for an hour. @@ -279,12 +285,12 @@ pub fn create_topology_task( async fn topology_task_process( tls: &Option, nodes: &Arc>>, - handshake: TaskHandshake, + handshake: &TaskHandshake, data_center: &str, ) -> Result<()> { - let outbound = new_connection(handshake.address, &handshake.handshake, tls, &None).await?; + let outbound = new_connection(&handshake.address, &handshake.handshake, tls, &None).await?; - let (return_chan_tx, return_chan_rx) = oneshot::channel(); + let (peers_tx, peers_rx) = oneshot::channel(); outbound.send( Message::from_frame(Frame::Cassandra(CassandraFrame { version: Version::V4, @@ -292,24 +298,38 @@ async fn topology_task_process( tracing_id: None, warnings: vec![], operation: CassandraOperation::Query { - query: Box::new(parse_statement_single("SELECT * FROM system.peers")), - params: Box::new(QueryParams { - consistency: Consistency::One, - with_names: false, - values: None, - page_size: Some(5000), - paging_state: None, - serial_consistency: None, - timestamp: Some(1643855761086585), - keyspace: None, - now_in_seconds: None, - }), + query: Box::new(parse_statement_single( + "SELECT peer, rack, data_center, tokens FROM system.peers", + )), + params: Box::new(QueryParams::default()), + }, + })), + peers_tx, + )?; + + let (local_tx, local_rx) = oneshot::channel(); + outbound.send( + Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V4, + stream_id: 1, + tracing_id: None, + warnings: vec![], + operation: CassandraOperation::Query { + query: Box::new(parse_statement_single( + "SELECT listen_address, rack, data_center, tokens FROM system.local", + )), + params: Box::new(QueryParams::default()), }, })), - return_chan_tx, + local_tx, )?; - let mut response = return_chan_rx.await?.response?; - let new_nodes = get_nodes_from_system_peers(&mut response, data_center); + + let (new_nodes, more_nodes) = tokio::join!( + async { system_peers_into_nodes(peers_rx.await?.response?, data_center) }, + async { system_peers_into_nodes(local_rx.await?.response?, data_center) } + ); + let mut new_nodes = new_nodes?; + new_nodes.extend(more_nodes?); let mut write_lock = nodes.write().await; let expensive_drop = std::mem::replace(&mut *write_lock, new_nodes); @@ -321,69 +341,70 @@ async fn topology_task_process( Ok(()) } -fn get_nodes_from_system_peers( - response: &mut Message, +fn system_peers_into_nodes( + mut response: Message, config_data_center: &str, -) -> Vec { - let mut new_nodes = vec![]; - let peer_ident = Identifier::Unquoted("peer".into()); - let rack_ident = Identifier::Unquoted("rack".into()); - let data_center_ident = Identifier::Unquoted("data_center".into()); - let tokens_ident = Identifier::Unquoted("tokens".into()); - +) -> Result> { if let Some(Frame::Cassandra(frame)) = response.frame() { - // CassandraOperation::Error(_) is another possible case, we should silently ignore such cases - if let CassandraOperation::Result(CassandraResult::Rows { - value: MessageValue::Rows(rows), - metadata, - }) = &mut frame.operation - { - for row in rows.iter() { - let mut address = None; - let mut rack = None; - let mut data_center = None; - let mut tokens = vec![]; - for (i, col) in metadata.col_specs.iter().enumerate() { - let ident = Identifier::parse(&col.name); - if ident == peer_ident { - if let Some(MessageValue::Inet(value)) = row.get(i) { - address = Some(*value); - } - } else if ident == rack_ident { - if let Some(MessageValue::Varchar(value)) = row.get(i) { - rack = Some(value.clone()); - } - } else if ident == data_center_ident { - if let Some(MessageValue::Varchar(value)) = row.get(i) { - data_center = Some(value.clone()); - } - } else if ident == tokens_ident { - if let Some(MessageValue::List(list)) = row.get(i) { - tokens = list - .iter() - .filter_map(|x| match x { - MessageValue::Varchar(a) => Some(a.clone()), - _ => None, - }) - .collect(); - } + match &mut frame.operation { + CassandraOperation::Result(CassandraResult::Rows { + value: MessageValue::Rows(rows), + .. + }) => rows + .iter_mut() + .filter(|row| { + if let Some(MessageValue::Varchar(data_center)) = row.get(2) { + data_center == config_data_center + } else { + false } - } - if let (Some(address), Some(rack), Some(data_center)) = (address, rack, data_center) - { - if data_center == config_data_center { - new_nodes.push(CassandraNode { - address, - _rack: rack, - _tokens: tokens, - outbound: None, - }); + }) + .map(|row| { + if row.len() != 4 { + return Err(anyhow!("expected 4 columns but was {}", row.len())); } - } - } + + let tokens = if let Some(MessageValue::List(list)) = row.pop() { + list.into_iter() + .map::, _>(|x| match x { + MessageValue::Varchar(a) => Ok(a), + _ => Err(anyhow!("tokens value not a varchar")), + }) + .collect::>>()? + } else { + return Err(anyhow!("tokens not a list")); + }; + let _data_center = row.pop(); + let rack = if let Some(MessageValue::Varchar(value)) = row.pop() { + value + } else { + return Err(anyhow!("rack not a varchar")); + }; + let address = if let Some(MessageValue::Inet(value)) = row.pop() { + value + } else { + return Err(anyhow!("address not an inet")); + }; + + Ok(CassandraNode { + address, + _rack: rack, + _tokens: tokens, + outbound: None, + }) + }) + .collect(), + operation => Err(anyhow!( + "system.peers returned unexpected cassandra operation: {:?}", + operation + )), } + } else { + Err(anyhow!( + "Failed to parse system.peers response {:?}", + response + )) } - new_nodes } fn is_use_statement(request: &mut Message) -> bool { diff --git a/shotover-proxy/tests/cassandra_int_tests/cluster.rs b/shotover-proxy/tests/cassandra_int_tests/cluster.rs index 2fc36d8b6..6fe0aac75 100644 --- a/shotover-proxy/tests/cassandra_int_tests/cluster.rs +++ b/shotover-proxy/tests/cassandra_int_tests/cluster.rs @@ -41,7 +41,7 @@ pub async fn test() { } // make assertions on the nodes list - assert_eq!(nodes.len(), 2); + assert_eq!(nodes.len(), 3); let mut possible_addresses: Vec = vec![ "172.16.1.2".parse().unwrap(), "172.16.1.3".parse().unwrap(),