Skip to content

Commit

Permalink
Splitup sink_cluster.rs (#752)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Aug 22, 2022
1 parent d9d306e commit 79c181f
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use super::connection::CassandraConnection;
use crate::codec::cassandra::CassandraCodec;
use crate::error::ChainResponse;
use crate::frame::cassandra::parse_statement_single;
use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame};
use crate::message::{Message, MessageValue, Messages};
use crate::tls::{TlsConnector, TlsConnectorConfig};
use crate::transforms::cassandra::connection::CassandraConnection;
use crate::transforms::util::Response;
use crate::transforms::{Transform, Transforms, Wrapper};
use anyhow::{anyhow, Result};
Expand All @@ -16,14 +16,16 @@ use cql3_parser::common::FQName;
use futures::stream::FuturesOrdered;
use futures::StreamExt;
use metrics::{register_counter, Counter};
use node::CassandraNode;
use rand::prelude::*;
use serde::Deserialize;
use std::net::{IpAddr, SocketAddr};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::ToSocketAddrs;
use tokio::sync::{mpsc, oneshot, RwLock};

mod node;

#[derive(Deserialize, Debug, Clone)]
pub struct CassandraSinkClusterConfig {
pub first_contact_points: Vec<String>,
Expand Down Expand Up @@ -303,7 +305,8 @@ async fn topology_task_process(
handshake: &TaskHandshake,
data_center: &str,
) -> Result<()> {
let outbound = new_connection(&handshake.address, &handshake.handshake, tls, &None).await?;
let outbound =
node::new_connection(&handshake.address, &handshake.handshake, tls, &None).await?;

let (peers_tx, peers_rx) = oneshot::channel();
outbound.send(
Expand Down Expand Up @@ -466,56 +469,8 @@ impl Transform for CassandraSinkCluster {
}
}

#[derive(Debug, Clone)]
pub struct CassandraNode {
pub address: IpAddr,
pub _rack: String,
pub _tokens: Vec<String>,
pub outbound: Option<CassandraConnection>,
}

#[derive(Debug)]
pub struct TaskHandshake {
pub handshake: Vec<Message>,
pub address: SocketAddr,
}

impl CassandraNode {
async fn get_connection(
&mut self,
handshake: &[Message],
tls: &Option<TlsConnector>,
pushed_messages_tx: &Option<mpsc::UnboundedSender<Messages>>,
) -> Result<&mut CassandraConnection> {
if self.outbound.is_none() {
self.outbound = Some(
new_connection((self.address, 9042), handshake, tls, pushed_messages_tx).await?,
)
}

Ok(self.outbound.as_mut().unwrap())
}
}

async fn new_connection<A: ToSocketAddrs>(
address: A,
handshake: &[Message],
tls: &Option<TlsConnector>,
pushed_messages_tx: &Option<mpsc::UnboundedSender<Messages>>,
) -> Result<CassandraConnection> {
let outbound = CassandraConnection::new(
address,
CassandraCodec::new(),
tls.clone(),
pushed_messages_tx.clone(),
)
.await?;

for handshake_message in handshake {
let (return_chan_tx, return_chan_rx) = oneshot::channel();
outbound.send(handshake_message.clone(), return_chan_tx)?;
return_chan_rx.await?;
}

Ok(outbound)
}
56 changes: 56 additions & 0 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use crate::codec::cassandra::CassandraCodec;
use crate::message::{Message, Messages};
use crate::tls::TlsConnector;
use crate::transforms::cassandra::connection::CassandraConnection;
use anyhow::Result;
use std::net::IpAddr;
use tokio::net::ToSocketAddrs;
use tokio::sync::{mpsc, oneshot};

#[derive(Debug, Clone)]
pub struct CassandraNode {
pub address: IpAddr,
pub _rack: String,
pub _tokens: Vec<String>,
pub outbound: Option<CassandraConnection>,
}

impl CassandraNode {
pub async fn get_connection(
&mut self,
handshake: &[Message],
tls: &Option<TlsConnector>,
pushed_messages_tx: &Option<mpsc::UnboundedSender<Messages>>,
) -> Result<&mut CassandraConnection> {
if self.outbound.is_none() {
self.outbound = Some(
new_connection((self.address, 9042), handshake, tls, pushed_messages_tx).await?,
)
}

Ok(self.outbound.as_mut().unwrap())
}
}

pub async fn new_connection<A: ToSocketAddrs>(
address: A,
handshake: &[Message],
tls: &Option<TlsConnector>,
pushed_messages_tx: &Option<mpsc::UnboundedSender<Messages>>,
) -> Result<CassandraConnection> {
let outbound = CassandraConnection::new(
address,
CassandraCodec::new(),
tls.clone(),
pushed_messages_tx.clone(),
)
.await?;

for handshake_message in handshake {
let (return_chan_tx, return_chan_rx) = oneshot::channel();
outbound.send(handshake_message.clone(), return_chan_tx)?;
return_chan_rx.await?;
}

Ok(outbound)
}

0 comments on commit 79c181f

Please sign in to comment.