Skip to content

Commit

Permalink
Misc cleanup around sources (#140)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Aug 17, 2021
1 parent 9ded46b commit 00a8b04
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 49 deletions.
2 changes: 0 additions & 2 deletions shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ tokio-util = { version = "0.6.1", features = ["full"]}
tokio-stream = "0.1.2"
bytes = "1.0.0"
futures = "0.3.12"
#futures = { path = "/home/ben/git/futures-rs/futures" }
futures-core = "0.3.1"
chrono = {version = "0.4.10", features = ["serde"]}
async-trait = "0.1.30"
Expand Down Expand Up @@ -65,7 +64,6 @@ mlua_serde = { version = "0.5", features = ["lua53"] }

#wasm
wasmer-runtime = "0.17.1"
#wasmer-runtime = "0.13.1"

#Crypto
sodiumoxide = "0.2.5"
Expand Down
40 changes: 11 additions & 29 deletions shotover-proxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ impl<C: Codec + 'static> TcpCodecListener<C> {
// "forget" the permit, which drops the permit value **without**
// incrementing the semaphore's permits. Then, in the handler task
// we manually add a new permit when processing completes.
// self.limit_connections.acquire().await.forget();

if self.hard_connection_limit {
match self.limit_connections.try_acquire() {
Ok(p) => {
Expand Down Expand Up @@ -170,7 +168,6 @@ impl<C: Codec + 'static> TcpCodecListener<C> {
let mut handler = Handler {
// Get a handle to the shared database. Internally, this is an
// `Arc`, so a clone only increments the ref count.
// chain: self.chain.clone(),
chain: self.chain.clone(),
client_details: peer,
conn_details: conn_string,
Expand All @@ -193,7 +190,6 @@ impl<C: Codec + 'static> TcpCodecListener<C> {
// dropped.
_shutdown_complete: self.shutdown_complete_tx.clone(),
};
// let chain = self.chain.clone();

// Spawn a new task to process the connections. Tokio tasks are like
// asynchronous green threads and are executed concurrently.
Expand Down Expand Up @@ -300,7 +296,7 @@ impl<C: Codec + 'static> Handler<C> {
pub async fn run(&mut self, stream: TcpStream) -> Result<()> {
// As long as the shutdown signal has not been received, try to read a
// new request frame.
let mut idle_time: u64 = 1;
let mut idle_time_seconds: u64 = 1;

let (in_tx, mut in_rx) = tokio::sync::mpsc::unbounded_channel::<Messages>();
let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<Messages>();
Expand Down Expand Up @@ -331,30 +327,26 @@ impl<C: Codec + 'static> Handler<C> {
});

while !self.shutdown.is_shutdown() {
// While reading a request frame, also listen for the shutdown
// signal

// While reading a request frame, also listen for the shutdown signal
trace!("Waiting for message");
let frame = tokio::select! {
res = timeout(Duration::from_secs(idle_time) , in_rx.recv()) => {
res = timeout(Duration::from_secs(idle_time_seconds) , in_rx.recv()) => {
match res {
Ok(maybe_message) => {
idle_time = 1;
idle_time_seconds = 1;
match maybe_message {
Some(m) => m,
None => return Ok(())
}
},
Err(_) => {
match idle_time {
0..=10 => trace!("Connection Idle for more than {} seconds {}", idle_time, self.conn_details),
11..=35 => trace!("Connection Idle for more than {} seconds {}", idle_time, self.conn_details),
_ => {
debug!("Dropping Connection Idle for more than {} seconds {}", idle_time, self.conn_details);
return Ok(())
}
if idle_time_seconds < 35 {
trace!("Connection Idle for more than {} seconds {}", idle_time_seconds, self.conn_details);
} else {
debug!("Dropping. Connection Idle for more than {} seconds {}", idle_time_seconds, self.conn_details);
return Ok(());
}
idle_time *= 2;
idle_time_seconds *= 2;
continue
}
}
Expand Down Expand Up @@ -389,16 +381,6 @@ impl<C: Codec + 'static> Handler<C> {
return Ok(());
}
}

// match frame {
// Ok(message) => {
//
// }
// Err(e) => {
// trace!("Error handling message in TcpStream source: {:?}", e);
// return Ok(());
// }
// }
}

Ok(())
Expand Down Expand Up @@ -462,7 +444,7 @@ impl Shutdown {
}

// Cannot receive a "lag error" as only one value is ever sent.
let _ = self.notify.recv().await;
self.notify.recv().await.unwrap();

// Remember that the signal has been received.
self.shutdown = true;
Expand Down
4 changes: 0 additions & 4 deletions shotover-proxy/src/sources/cassandra_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ pub struct CassandraSource {
}

impl CassandraSource {
//"127.0.0.1:9043
pub async fn new(
chain: &TransformChain,
listen_addr: String,
Expand All @@ -68,7 +67,6 @@ impl CassandraSource {
connection_limit: Option<usize>,
hard_connection_limit: Option<bool>,
) -> CassandraSource {
// let listener = TcpListener::bind(listen_addr.clone()).await.unwrap();
let name = "Cassandra Source";

info!("Starting Cassandra source on [{}]", listen_addr);
Expand Down Expand Up @@ -97,8 +95,6 @@ impl CassandraSource {
}
}

// listener.run().await?;

let TcpCodecListener {
notify_shutdown,
shutdown_complete_tx,
Expand Down
16 changes: 2 additions & 14 deletions shotover-proxy/src/sources/redis_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ pub struct RedisSource {
}

impl RedisSource {
//"127.0.0.1:9043
pub async fn new(
chain: &TransformChain,
listen_addr: String,
Expand All @@ -64,15 +63,6 @@ impl RedisSource {
connection_limit: Option<usize>,
hard_connection_limit: Option<bool>,
) -> RedisSource {
// let mut socket =
// socket2::Socket::new(Domain::ipv4(), Type::stream(), Some(Protocol::tcp())).unwrap();
// let addr = listen_addr.clone().parse::<SocketAddrV4>().unwrap();
// socket.bind(&addr.into()).unwrap();
// socket.listen(10).unwrap();
//
// // let listener = TcpListener::bind(listen_addr.clone()).await.unwrap();
// let listener = TcpListener::from_std(socket.into_tcp_listener()).unwrap();

info!("Starting Redis source on [{}]", listen_addr);
let name = "Redis Source";

Expand All @@ -88,7 +78,7 @@ impl RedisSource {
shutdown_complete_tx,
};

let jh = Handle::current().spawn(async move {
let join_handle = Handle::current().spawn(async move {
tokio::select! {
res = listener.run() => {
if let Err(err) = res {
Expand All @@ -109,14 +99,12 @@ impl RedisSource {
drop(shutdown_complete_tx);
drop(notify_shutdown);

// let _ shutd

Ok(())
});

RedisSource {
name,
join_handle: jh,
join_handle,
listen_addr: listen_addr.clone(),
}
}
Expand Down

0 comments on commit 00a8b04

Please sign in to comment.