diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs index 0180d6064..e7378123d 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs @@ -27,14 +27,18 @@ pub fn create_topology_task( tokio::spawn(async move { while let Some(mut connection_info) = connection_info_rx.recv().await { let mut attempts = 0; - while let Err(err) = - topology_task_process(&nodes_tx, &mut connection_info, &data_center).await - { - tracing::error!("topology task failed, retrying, error was: {err:?}"); - attempts += 1; - if attempts > 3 { - // 3 attempts have failed, lets try a new handshake - break; + match topology_task_process(&nodes_tx, &mut connection_info, &data_center).await { + Err(err) => { + tracing::error!("topology task failed, retrying, error was: {err:?}"); + attempts += 1; + if attempts > 3 { + // 3 attempts have failed, lets try a new handshake + break; + } + } + Ok(()) => { + // cleanly shutdown the task + return; } } } @@ -59,12 +63,20 @@ async fn topology_task_process( let version = connection_info.connection_factory.get_version()?; let mut nodes = fetch_current_nodes(&connection, connection_info, data_center).await?; - nodes_tx.send(nodes.clone())?; + if let Err(watch::error::SendError(_)) = nodes_tx.send(nodes.clone()) { + return Ok(()); + } register_for_topology_and_status_events(&connection, version).await?; loop { - match pushed_messages_rx.recv().await { + // Wait for events to come in from the cassandra node. + // If all the nodes receivers are closed then immediately stop listening and shutdown the task + let pushed_messages = tokio::select! { + pushed_messages = pushed_messages_rx.recv() => pushed_messages, + _ = nodes_tx.closed() => return Ok(()) + }; + match pushed_messages { Some(messages) => { for mut message in messages { if let Some(Frame::Cassandra(CassandraFrame { @@ -114,11 +126,11 @@ async fn topology_task_process( } } } - None => { - return Err(anyhow!("topology control connection was closed")); - } + None => return Err(anyhow!("topology control connection was closed")), + } + if let Err(watch::error::SendError(_)) = nodes_tx.send(nodes.clone()) { + return Ok(()); } - nodes_tx.send(nodes.clone())?; } }