Skip to content

Commit

Permalink
CassandraSinkCluster index removal (#1555)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Apr 2, 2024
1 parent 6dff7b7 commit 58ed58e
Showing 1 changed file with 16 additions and 37 deletions.
53 changes: 16 additions & 37 deletions shotover/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::frame::cassandra::{CassandraMetadata, Tracing};
use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame};
use crate::message::{Message, MessageIdMap, Messages, Metadata};
use crate::tls::{TlsConnector, TlsConnectorConfig};
use crate::transforms::cassandra::connection::{CassandraConnection, Response, ResponseError};
use crate::transforms::cassandra::connection::{CassandraConnection, ResponseError};
use crate::transforms::{
Transform, TransformBuilder, TransformConfig, TransformContextBuilder, TransformContextConfig,
Wrapper,
Expand All @@ -19,7 +19,6 @@ use cassandra_protocol::types::CBytesShort;
use cql3_parser::cassandra_statement::CassandraStatement;
use cql3_parser::common::IdentifierRef;
use futures::stream::FuturesOrdered;
use futures::StreamExt;
use metrics::{counter, Counter};
use node::{CassandraNode, ConnectionFactory};
use node_pool::{GetReplicaErr, KeyspaceMetadata, NodePool};
Expand Down Expand Up @@ -347,8 +346,6 @@ impl CassandraSinkCluster {

let mut responses_future = FuturesOrdered::new();

let mut responses_future_use = FuturesOrdered::new();
let mut use_future_index_to_node_index = vec![];
for mut message in messages.into_iter() {
let return_chan_rx = if self.pool.nodes().is_empty()
|| !self.init_handshake_complete
Expand All @@ -363,16 +360,23 @@ impl CassandraSinkCluster {
// created will have the correct keyspace setup.
self.connection_factory.set_use_message(message.clone());

// Send the USE statement to all open connections to ensure they are all in sync
for (node_index, node) in self.pool.nodes().iter().enumerate() {
if let Some(connection) = &node.outbound {
responses_future_use.push_back(connection.send(message.clone())?);
use_future_index_to_node_index.push(node_index);
}
// route to the control connection
let result = self.control_connection.as_mut().unwrap().send(message)?;

// Close all other connections as they are now invalidated.
// They will be recreated as needed with the correct use statement used automatically after the handshake
for node in self.pool.nodes_mut() {
node.outbound = None;
}

// Send the USE statement to the handshake connection and use the response as shotovers response
self.control_connection.as_mut().unwrap().send(message)?
// Sending the use statement to these connections to keep them alive instead is possible but tricky.
// 1. The destinations need to be calculated here, at sending time, to ensure no new connections have been created in the meantime.
// 2. We need a way to filter out these extra responses from reaching the client.
// 3. But we cant use the TableRewrite abstraction since that occurs too early. See 1.
//
// It might be worth doing in the future.

result
} else if is_prepare_message(&mut message) {
let next_host_id = self.message_rewriter.get_destination_for_prepare(&message);
match self
Expand Down Expand Up @@ -507,18 +511,6 @@ impl CassandraSinkCluster {
}
}

for node_index in use_future_index_to_node_index {
let response = responses_future_use
.next()
.await
.map(|x| x.map_err(|e| anyhow!(e)));
// If any errors occurred close the connection as we can no
// longer make any guarantees about the current state of the connection
if !is_use_statement_successful(response) {
self.pool.nodes_mut()[node_index].outbound = None;
}
}

self.message_rewriter.rewrite_responses(&mut responses)?;

for response in responses.iter_mut() {
Expand Down Expand Up @@ -699,19 +691,6 @@ fn is_ddl_statement(request: &mut Message) -> bool {
false
}

fn is_use_statement_successful(response: Option<Result<Response>>) -> bool {
if let Some(Ok(Ok(mut response))) = response {
if let Some(Frame::Cassandra(CassandraFrame {
operation: CassandraOperation::Result(CassandraResult::SetKeyspace(_)),
..
})) = response.frame()
{
return true;
}
}
false
}

#[async_trait]
impl Transform for CassandraSinkCluster {
fn get_name(&self) -> &'static str {
Expand Down

0 comments on commit 58ed58e

Please sign in to comment.