diff --git a/shotover-proxy/src/transforms/cassandra/connection.rs b/shotover-proxy/src/transforms/cassandra/connection.rs index 286981ffa..ac2cdf660 100644 --- a/shotover-proxy/src/transforms/cassandra/connection.rs +++ b/shotover-proxy/src/transforms/cassandra/connection.rs @@ -12,6 +12,7 @@ use derivative::Derivative; use futures::stream::FuturesOrdered; use futures::{SinkExt, StreamExt}; use halfbrown::HashMap; +use std::net::SocketAddr; use std::time::Duration; use tokio::io::{split, AsyncRead, AsyncWrite, ReadHalf, WriteHalf}; use tokio::net::ToSocketAddrs; @@ -27,10 +28,25 @@ struct Request { stream_id: i16, } -#[derive(Debug)] -pub struct Response { +pub type Response = Result; + +#[derive(Debug, thiserror::Error)] +#[error("Connection to destination cassandra node {destination} was closed: {cause}")] +pub struct ResponseError { + #[source] + pub cause: anyhow::Error, + pub destination: SocketAddr, pub stream_id: i16, - pub response: Result, +} + +impl ResponseError { + pub fn to_response(&self, version: Version) -> Message { + Message::from_frame(Frame::Cassandra(CassandraFrame::shotover_error( + self.stream_id, + version, + &format!("{:#}", self.cause), + ))) + } } #[derive(Debug)] @@ -59,7 +75,7 @@ impl CassandraConnection { let (return_tx, return_rx) = mpsc::unbounded_channel::(); let (rx_process_has_shutdown_tx, rx_process_has_shutdown_rx) = oneshot::channel::(); - let destination = format!("{host:?}"); + let destination = tokio::net::lookup_host(&host).await?.next().unwrap(); if let Some(tls) = tls.as_mut() { let tls_stream = tls.connect(connect_timeout, host).await?; @@ -71,7 +87,7 @@ impl CassandraConnection { return_tx, codec.clone(), rx_process_has_shutdown_rx, - destination.clone(), + destination, ) .in_current_span(), ); @@ -96,7 +112,7 @@ impl CassandraConnection { return_tx, codec.clone(), rx_process_has_shutdown_rx, - destination.clone(), + destination, ) .in_current_span(), ); @@ -156,7 +172,7 @@ async fn tx_process( codec: CassandraCodec, mut rx_process_has_shutdown_rx: oneshot::Receiver, // Only used for error reporting - destination: String, + destination: SocketAddr, ) { let mut in_w = FramedWrite::new(write, codec); @@ -167,10 +183,10 @@ async fn tx_process( loop { if let Some(request) = out_rx.recv().await { if let Some(error) = &connection_dead_error { - send_error_to_request(request.return_chan, request.stream_id, &destination, error); + send_error_to_request(request.return_chan, request.stream_id, destination, error); } else if let Err(error) = in_w.send(vec![request.message]).await { let error = format!("{:?}", error); - send_error_to_request(request.return_chan, request.stream_id, &destination, &error); + send_error_to_request(request.return_chan, request.stream_id, destination, &error); connection_dead_error = Some(error.clone()); } else if let Err(mpsc::error::SendError(return_chan)) = return_tx.send(ReturnChannel { return_chan: request.return_chan, @@ -182,7 +198,7 @@ async fn tx_process( send_error_to_request( return_chan.return_chan, return_chan.stream_id, - &destination, + destination, &error, ); connection_dead_error = Some(error.clone()); @@ -214,16 +230,15 @@ async fn tx_process( fn send_error_to_request( return_chan: oneshot::Sender, stream_id: i16, - destination: &str, + destination: SocketAddr, error: &str, ) { return_chan - .send(Response { + .send(Err(ResponseError { + cause: anyhow!(error.to_owned()), + destination, stream_id, - response: Err(anyhow!( - "Connection to destination cassandra node {destination} was closed: {error}" - )), - }) + })) .ok(); } @@ -234,7 +249,7 @@ async fn rx_process( pushed_messages_tx: Option>, rx_process_has_shutdown_tx: oneshot::Sender, // Only used for error reporting - destination: String, + destination: SocketAddr, ) { let mut reader = FramedRead::new(read, codec); @@ -274,7 +289,7 @@ async fn rx_process( from_server.insert(stream_id, m); }, Some(return_tx) => { - return_tx.send(Response { stream_id, response: Ok(m) }).ok(); + return_tx.send(Ok(m)).ok(); } } } @@ -306,7 +321,7 @@ async fn rx_process( from_tx_process.insert(stream_id, return_chan); } Some(m) => { - return_chan.send(Response { stream_id, response: Ok(m) }).ok(); + return_chan.send(Ok(m)).ok(); } } } else { @@ -325,7 +340,7 @@ async fn send_errors_and_shutdown( mut return_rx: mpsc::UnboundedReceiver, mut waiting: HashMap>, rx_process_has_shutdown_tx: oneshot::Sender, - destination: String, + destination: SocketAddr, message: &str, ) { // Ensure we send this before closing return_rx. @@ -337,15 +352,13 @@ async fn send_errors_and_shutdown( return_rx.close(); - let full_message = - format!("Connection to destination cassandra node {destination} was closed: {message}"); - for (stream_id, return_tx) in waiting.drain() { return_tx - .send(Response { + .send(Err(ResponseError { + cause: anyhow!(message.to_owned()), + destination, stream_id, - response: Err(anyhow!(full_message.to_owned())), - }) + })) .ok(); } @@ -353,10 +366,11 @@ async fn send_errors_and_shutdown( while let Some(return_chan) = return_rx.recv().await { return_chan .return_chan - .send(Response { + .send(Err(ResponseError { + cause: anyhow!(message.to_owned()), + destination, stream_id: return_chan.stream_id, - response: Err(anyhow!(full_message.to_owned())), - }) + })) .ok(); } } @@ -365,20 +379,19 @@ pub async fn receive( timeout_duration: Option, failed_requests: &metrics::Counter, mut results: FuturesOrdered>, - version: Version, -) -> Result { +) -> Result>> { let expected_size = results.len(); let mut responses = Vec::with_capacity(expected_size); while responses.len() < expected_size { if let Some(timeout_duration) = timeout_duration { match timeout( timeout_duration, - receive_message(failed_requests, &mut results, version), + receive_message(failed_requests, &mut results), ) .await { Ok(response) => { - responses.push(response?); + responses.push(response); } Err(_) => { return Err(anyhow!( @@ -389,7 +402,7 @@ pub async fn receive( } } } else { - responses.push(receive_message(failed_requests, &mut results, version).await?); + responses.push(receive_message(failed_requests, &mut results).await); } } Ok(responses) @@ -398,14 +411,10 @@ pub async fn receive( async fn receive_message( failed_requests: &metrics::Counter, results: &mut FuturesOrdered>, - version: Version, -) -> Result { +) -> Result { match results.next().await { Some(result) => match result.expect("The tx_process task must always return a value") { - Response { - response: Ok(message), - .. - } => { + Ok(message) => { if let Ok(Metadata::Cassandra(CassandraMetadata { opcode: Opcode::Error, .. @@ -415,12 +424,7 @@ async fn receive_message( } Ok(message) } - Response { - stream_id, - response: Err(err), - } => Ok(Message::from_frame(Frame::Cassandra( - CassandraFrame::shotover_error(stream_id, version, &format!("{:?}", err)), - ))), + err => err, }, None => unreachable!("Ran out of responses"), } diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs index b1166bdac..cb335075d 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs @@ -252,6 +252,12 @@ impl CassandraSinkCluster { } if self.nodes_rx.has_changed()? { + // This approach to keeping nodes list up to date has a problem when a node goes down and then up again before this transform instance can process the down going down. + // When this happens we never detect that the node went down and a dead connection is left around. + // Broadcast channel's SendError::Lagged would solve this problem but we cant use broadcast channels because cloning them doesnt keep a past value. + // It might be worth implementing a custom watch channel that supports Lagged errors to improve correctness. + // + // However none of this is actually a problem because dead connection detection logic handles this case for us. self.pool.update_nodes(&mut self.nodes_rx); // recreate the control connection if it is down @@ -434,23 +440,18 @@ impl CassandraSinkCluster { // send an unprepared error in response to force // the client to reprepare the query return_chan_tx - .send(Response { - stream_id: metadata.stream_id, - response: Ok(Message::from_frame(Frame::Cassandra( - CassandraFrame { - operation: CassandraOperation::Error(ErrorBody { - message: "Shotover does not have this query's metadata. Please re-prepare on this Shotover host before sending again.".into(), - ty: ErrorType::Unprepared(UnpreparedError { - id, - }), - }), - stream_id: metadata.stream_id, - tracing: Tracing::Response(None), // We didn't actually hit a node so we don't have a tracing id - version: self.version.unwrap(), - warnings: vec![], - }, - ))), - }).expect("the receiver is guaranteed to be alive, so this must succeed"); + .send(Ok(Message::from_frame(Frame::Cassandra( + CassandraFrame { + operation: CassandraOperation::Error(ErrorBody { + message: "Shotover does not have this query's metadata. Please re-prepare on this Shotover host before sending again.".into(), + ty: ErrorType::Unprepared(UnpreparedError { id }), + }), + stream_id: metadata.stream_id, + tracing: Tracing::Response(None), // We didn't actually hit a node so we don't have a tracing id + version: self.version.unwrap(), + warnings: vec![], + }, + )))).expect("the receiver is guaranteed to be alive, so this must succeed"); } Err(GetReplicaErr::Other(err)) => { return Err(err); @@ -471,22 +472,37 @@ impl CassandraSinkCluster { responses_future.push_back(return_chan_rx) } - let mut responses = super::connection::receive( - self.read_timeout, - &self.failed_requests, - responses_future, - self.version.unwrap(), - ) - .await?; + let response_results = + super::connection::receive(self.read_timeout, &self.failed_requests, responses_future) + .await?; + let mut responses = vec![]; + for response in response_results { + match response { + Ok(response) => responses.push(response), + Err(error) => { + self.pool.report_issue_with_node(error.destination); + responses.push(error.to_response(self.version.unwrap())); + } + } + } { - let mut prepare_responses = super::connection::receive( + let prepare_response_results = super::connection::receive( self.read_timeout, &self.failed_requests, responses_future_prepare, - self.version.unwrap(), ) .await?; + let mut prepare_responses = vec![]; + for response in prepare_response_results { + match response { + Ok(response) => prepare_responses.push(response), + Err(error) => { + self.pool.report_issue_with_node(error.destination); + prepare_responses.push(error.to_response(self.version.unwrap())); + } + } + } let prepared_results: Vec<&mut Box> = prepare_responses .iter_mut() @@ -1119,11 +1135,7 @@ fn is_ddl_statement(request: &mut Message) -> bool { } fn is_use_statement_successful(response: Option>) -> bool { - if let Some(Ok(Response { - response: Ok(mut response), - .. - })) = response - { + if let Some(Ok(Ok(mut response))) = response { if let Some(Frame::Cassandra(CassandraFrame { operation: CassandraOperation::Result(CassandraResult::SetKeyspace(_)), .. diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs index c63290592..c95f4b8e3 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs @@ -124,7 +124,7 @@ impl ConnectionFactory { })?; return_chan_rx.await.map_err(|e| { anyhow!(e).context("Failed to initialize new connection with handshake, rx failed") - })?; + })??; } if let Some(use_message) = &self.use_message { @@ -138,7 +138,7 @@ impl ConnectionFactory { return_chan_rx.await.map_err(|e| { anyhow!(e) .context("Failed to initialize new connection with use message, rx failed") - })?; + })??; } Ok(outbound) diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs index cd4ccd16e..efcf30794 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs @@ -80,6 +80,15 @@ impl NodePool { self.token_map = TokenMap::new(self.nodes.as_slice()); } + pub fn report_issue_with_node(&mut self, address: SocketAddr) { + for node in &mut self.nodes { + if node.address == address { + node.is_up = false; + node.outbound = None; + } + } + } + pub async fn update_keyspaces(&mut self, keyspaces_rx: &mut KeyspaceChanRx) { let updated_keyspaces = keyspaces_rx.borrow_and_update().clone(); self.keyspace_metadata = updated_keyspaces; diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs index 042f8f243..12a4acb46 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs @@ -204,7 +204,7 @@ async fn register_for_topology_and_status_events( ) .unwrap(); - if let Some(Frame::Cassandra(CassandraFrame { operation, .. })) = rx.await?.response?.frame() { + if let Some(Frame::Cassandra(CassandraFrame { operation, .. })) = rx.await??.frame() { match operation { CassandraOperation::Ready(_) => Ok(()), operation => Err(anyhow!("Expected Cassandra to respond to a Register with a Ready. Instead it responded with {:?}", operation)) @@ -259,7 +259,7 @@ mod system_keyspaces { tx, )?; - let response = rx.await?.response?; + let response = rx.await??; into_keyspaces(response, data_center) } @@ -386,7 +386,7 @@ mod system_local { tx, )?; - into_nodes(rx.await?.response?, data_center, address) + into_nodes(rx.await??, data_center, address) } fn into_nodes( @@ -473,7 +473,7 @@ mod system_peers { tx, )?; - let mut response = rx.await?.response?; + let mut response = rx.await??; if is_peers_v2_does_not_exist_error(&mut response) { let (tx, rx) = oneshot::channel(); @@ -492,7 +492,7 @@ mod system_peers { })), tx, )?; - response = rx.await?.response?; + response = rx.await??; } into_nodes(response, data_center) diff --git a/shotover-proxy/src/transforms/cassandra/sink_single.rs b/shotover-proxy/src/transforms/cassandra/sink_single.rs index 6b7ae9844..c1fc72a19 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_single.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_single.rs @@ -138,13 +138,17 @@ impl CassandraSinkSingle { }) .collect(); - super::connection::receive( - self.read_timeout, - &self.failed_requests, - responses_future?, - self.version.unwrap(), - ) - .await + super::connection::receive(self.read_timeout, &self.failed_requests, responses_future?) + .await + .map(|responses| { + responses + .into_iter() + .map(|response| match response { + Ok(response) => response, + Err(error) => error.to_response(self.version.unwrap()), + }) + .collect() + }) } } diff --git a/shotover-proxy/tests/cassandra_int_tests/cluster/single_rack_v4.rs b/shotover-proxy/tests/cassandra_int_tests/cluster/single_rack_v4.rs index e73339012..63c19d2e1 100644 --- a/shotover-proxy/tests/cassandra_int_tests/cluster/single_rack_v4.rs +++ b/shotover-proxy/tests/cassandra_int_tests/cluster/single_rack_v4.rs @@ -2,12 +2,12 @@ use crate::cassandra_int_tests::cluster::run_topology_task; use crate::helpers::cassandra::{ assert_query_result, run_query, CassandraConnection, CassandraDriver, ResultValue, }; -use crate::helpers::ShotoverManager; use cassandra_protocol::events::ServerEvent; use cassandra_protocol::frame::events::{StatusChange, StatusChangeType}; use std::net::SocketAddr; use std::time::Duration; use test_helpers::docker_compose::DockerCompose; +use tokio::sync::broadcast; use tokio::time::{sleep, timeout}; async fn test_rewrite_system_peers(connection: &CassandraConnection) { @@ -275,12 +275,7 @@ pub async fn test_topology_task(ca_path: Option<&str>, cassandra_port: Option, + _shotover: CassandraConnection, + recv_shotover: broadcast::Receiver, +} + +impl EventConnections { + async fn new() -> Self { + let direct = CassandraConnection::new("172.16.1.2", 9044, CassandraDriver::CdrsTokio).await; + let recv_direct = direct.as_cdrs().create_event_receiver(); + + let shotover = + CassandraConnection::new("127.0.0.1", 9042, CassandraDriver::CdrsTokio).await; + let recv_shotover = shotover.as_cdrs().create_event_receiver(); + + // let the driver finish connecting to the cluster and registering for the events + sleep(Duration::from_secs(10)).await; + EventConnections { + _direct: direct, + recv_direct, + _shotover: shotover, + recv_shotover, + } + } +} + +async fn assert_down_event(event_connections: &mut EventConnections) { + loop { // The direct connection should allow all events to pass through - let event = timeout(Duration::from_secs(120), event_recv_direct.recv()) - .await - .unwrap() - .unwrap(); - assert_eq!( + let event = timeout( + Duration::from_secs(120), + event_connections.recv_direct.recv(), + ) + .await + .unwrap() + .unwrap(); + + // Sometimes we get up status events if we connect early enough. + // I assume these are just due to the nodes initially joining the cluster. + // If we hit one skip it and continue searching for our expected down status event + if matches!( event, ServerEvent::StatusChange(StatusChange { change_type: StatusChangeType::Up, + .. + }) + ) { + continue; + } + + assert_eq!( + event, + ServerEvent::StatusChange(StatusChange { + change_type: StatusChangeType::Down, addr: "172.16.1.3:9044".parse().unwrap() }) ); - // we have already received an event directly from the cassandra instance so its reasonable to - // expect shotover to have processed that event within 10 seconds if it was ever going to - timeout(Duration::from_secs(10), event_recv_shotover.recv()) - .await - .expect_err("CassandraSinkCluster must filter out this event"); - - let new_new_connection = CassandraConnection::new("127.0.0.1", 9042, driver).await; - - test_connection_handles_node_down(&new_new_connection, driver).await; - test_connection_handles_node_down(&new_connection, driver).await; - test_connection_handles_node_down(&connection_shotover, driver).await; + break; } - std::mem::drop(connection_shotover); - // Purposefully dispose of these as we left the underlying cassandra cluster in a non-recoverable state - std::mem::drop(shotover_manager); - std::mem::drop(compose); + // we have already received an event directly from the cassandra instance so its reasonable to + // expect shotover to have processed that event within 10 seconds if it was ever going to + timeout( + Duration::from_secs(10), + event_connections.recv_shotover.recv(), + ) + .await + .expect_err("CassandraSinkCluster must filter out this event"); +} + +async fn assert_up_event(event_connections: &mut EventConnections) { + // The direct connection should allow all events to pass through + let event = timeout( + Duration::from_secs(120), + event_connections.recv_direct.recv(), + ) + .await + .unwrap() + .unwrap(); + assert_eq!( + event, + ServerEvent::StatusChange(StatusChange { + change_type: StatusChangeType::Up, + addr: "172.16.1.3:9044".parse().unwrap() + }) + ); + // we have already received an event directly from the cassandra instance so its reasonable to + // expect shotover to have processed that event within 10 seconds if it was ever going to + timeout( + Duration::from_secs(10), + event_connections.recv_shotover.recv(), + ) + .await + .expect_err("CassandraSinkCluster must filter out this event"); } async fn test_connection_handles_node_down( @@ -418,3 +487,23 @@ async fn test_connection_handles_node_down( .await; } } + +async fn test_connection_handles_node_down_with_one_retry(connection: &CassandraConnection) { + // run this a few times to make sure we arent getting lucky with the routing + let mut fail_count = 0; + for _ in 0..50 { + if connection + .execute_fallible( + "SELECT pk, col1, col2 FROM cluster_single_rack_node_going_down.test_table;", + ) + .await + .is_err() + { + fail_count += 1; + } + } + assert!( + fail_count == 0 || fail_count == 1, + "must never fail or fail only once. The case where it fails once indicates that the connection to cassandra-two was dead but recreated allowing the next query to succeed" + ) +} diff --git a/shotover-proxy/tests/cassandra_int_tests/mod.rs b/shotover-proxy/tests/cassandra_int_tests/mod.rs index 73016a81a..4072a9fe8 100644 --- a/shotover-proxy/tests/cassandra_int_tests/mod.rs +++ b/shotover-proxy/tests/cassandra_int_tests/mod.rs @@ -191,6 +191,8 @@ async fn cluster_single_rack_v4(#[case] driver: CassandraDriver) { .enable_schema_awaiter("172.16.1.2:9044", None) .await; native_types::test(&connection2).await; + + cluster::single_rack_v4::test_node_going_down(&compose, driver).await; } { @@ -202,25 +204,6 @@ async fn cluster_single_rack_v4(#[case] driver: CassandraDriver) { } cluster::single_rack_v4::test_topology_task(None, Some(9044)).await; - - let shotover_manager = - ShotoverManager::from_topology_file("example-configs/cassandra-cluster-v4/topology.yaml"); - cluster::single_rack_v4::test_node_going_down(compose, shotover_manager, driver, false).await; -} - -#[cfg(feature = "alpha-transforms")] -#[cfg(feature = "cassandra-cpp-driver-tests")] -#[rstest] -//#[case::cdrs(CdrsTokio)] -#[cfg_attr(feature = "cassandra-cpp-driver-tests", case::datastax(Datastax))] -#[tokio::test(flavor = "multi_thread")] -#[serial] -async fn cluster_single_rack_node_lost(#[case] driver: CassandraDriver) { - let compose = DockerCompose::new("example-configs/cassandra-cluster-v4/docker-compose.yaml"); - - let shotover_manager = - ShotoverManager::from_topology_file("example-configs/cassandra-cluster-v4/topology.yaml"); - cluster::single_rack_v4::test_node_going_down(compose, shotover_manager, driver, true).await; } #[rstest]