Skip to content

Commit

Permalink
rename init_handshake_connection -> control_connection (#874)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Oct 24, 2022
1 parent f4b5a7f commit 5b7ef5e
Showing 1 changed file with 19 additions and 15 deletions.
34 changes: 19 additions & 15 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,12 @@ pub struct CassandraSinkCluster {
connection_factory: ConnectionFactory,

shotover_peers: Vec<ShotoverNode>,
init_handshake_connection: Option<CassandraConnection>,
init_handshake_address: Option<SocketAddr>,
// Used for any messages that require a consistent destination, this includes:
// * The initial handshake
// * DDL queries
// * system queries
control_connection: Option<CassandraConnection>,
control_connection_address: Option<SocketAddr>,
init_handshake_complete: bool,

chain_name: String,
Expand All @@ -121,9 +125,9 @@ impl Clone for CassandraSinkCluster {
Self {
contact_points: self.contact_points.clone(),
shotover_peers: self.shotover_peers.clone(),
init_handshake_connection: None,
control_connection: None,
connection_factory: self.connection_factory.new_with_same_config(),
init_handshake_address: None,
control_connection_address: None,
init_handshake_complete: false,
chain_name: self.chain_name.clone(),
failed_requests: self.failed_requests.clone(),
Expand Down Expand Up @@ -169,8 +173,8 @@ impl CassandraSinkCluster {
contact_points,
connection_factory: ConnectionFactory::new(tls),
shotover_peers,
init_handshake_connection: None,
init_handshake_address: None,
control_connection: None,
control_connection_address: None,
init_handshake_complete: false,
chain_name,
failed_requests,
Expand Down Expand Up @@ -235,7 +239,7 @@ impl CassandraSinkCluster {

// Create the initial connection.
// Messages will be sent through this connection until we have extracted the handshake.
if self.init_handshake_connection.is_none() {
if self.control_connection.is_none() {
let random_point = if self.pool.nodes().iter().all(|x| !x.is_up) {
tokio::net::lookup_host(self.contact_points.choose(&mut self.rng).unwrap())
.await?
Expand All @@ -247,9 +251,9 @@ impl CassandraSinkCluster {
.address
};

self.init_handshake_connection =
self.control_connection =
Some(self.connection_factory.new_connection(random_point).await?);
self.init_handshake_address = Some(random_point);
self.control_connection_address = Some(random_point);
}

if !self.init_handshake_complete {
Expand Down Expand Up @@ -285,7 +289,7 @@ impl CassandraSinkCluster {
|| is_ddl_statement(&mut message)
|| self.is_system_query(&mut message)
{
self.init_handshake_connection
self.control_connection
.as_mut()
.unwrap()
.send(message, return_chan_tx)?;
Expand All @@ -305,7 +309,7 @@ impl CassandraSinkCluster {
}

// Send the USE statement to the handshake connection and use the response as shotovers response
self.init_handshake_connection
self.control_connection
.as_mut()
.unwrap()
.send(message, return_chan_tx)?;
Expand Down Expand Up @@ -484,7 +488,7 @@ impl CassandraSinkCluster {
if let Ok(permit) = self.task_handshake_tx.try_reserve() {
permit.send(TaskConnectionInfo {
connection_factory: self.connection_factory.clone(),
address: self.init_handshake_address.unwrap(),
address: self.control_connection_address.unwrap(),
})
}
self.init_handshake_complete = true;
Expand All @@ -500,16 +504,16 @@ impl CassandraSinkCluster {
.pool
.get_round_robin_node_in_dc_rack(&self.local_shotover_node.rack)
.address;
self.init_handshake_connection = Some(
self.control_connection = Some(
self.connection_factory
.new_connection(random_address)
.await?,
);
self.init_handshake_address = Some(random_address);
self.control_connection_address = Some(random_address);
}
tracing::info!(
"Control connection finalized against node at: {:?}",
self.init_handshake_address.unwrap()
self.control_connection_address.unwrap()
);

Ok(())
Expand Down

0 comments on commit 5b7ef5e

Please sign in to comment.