Skip to content

Commit

Permalink
CassandraSinkCluster: cleanly shutdown topology task (#861)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Oct 18, 2022
1 parent cf87466 commit 2d7be5b
Showing 1 changed file with 26 additions and 14 deletions.
40 changes: 26 additions & 14 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,18 @@ pub fn create_topology_task(
tokio::spawn(async move {
while let Some(mut connection_info) = connection_info_rx.recv().await {
let mut attempts = 0;
while let Err(err) =
topology_task_process(&nodes_tx, &mut connection_info, &data_center).await
{
tracing::error!("topology task failed, retrying, error was: {err:?}");
attempts += 1;
if attempts > 3 {
// 3 attempts have failed, lets try a new handshake
break;
match topology_task_process(&nodes_tx, &mut connection_info, &data_center).await {
Err(err) => {
tracing::error!("topology task failed, retrying, error was: {err:?}");
attempts += 1;
if attempts > 3 {
// 3 attempts have failed, lets try a new handshake
break;
}
}
Ok(()) => {
// cleanly shutdown the task
return;
}
}
}
Expand All @@ -59,12 +63,20 @@ async fn topology_task_process(
let version = connection_info.connection_factory.get_version()?;

let mut nodes = fetch_current_nodes(&connection, connection_info, data_center).await?;
nodes_tx.send(nodes.clone())?;
if let Err(watch::error::SendError(_)) = nodes_tx.send(nodes.clone()) {
return Ok(());
}

register_for_topology_and_status_events(&connection, version).await?;

loop {
match pushed_messages_rx.recv().await {
// Wait for events to come in from the cassandra node.
// If all the nodes receivers are closed then immediately stop listening and shutdown the task
let pushed_messages = tokio::select! {
pushed_messages = pushed_messages_rx.recv() => pushed_messages,
_ = nodes_tx.closed() => return Ok(())
};
match pushed_messages {
Some(messages) => {
for mut message in messages {
if let Some(Frame::Cassandra(CassandraFrame {
Expand Down Expand Up @@ -114,11 +126,11 @@ async fn topology_task_process(
}
}
}
None => {
return Err(anyhow!("topology control connection was closed"));
}
None => return Err(anyhow!("topology control connection was closed")),
}
if let Err(watch::error::SendError(_)) = nodes_tx.send(nodes.clone()) {
return Ok(());
}
nodes_tx.send(nodes.clone())?;
}
}

Expand Down

0 comments on commit 2d7be5b

Please sign in to comment.