Skip to content

Commit

Permalink
Make the connection pool grow slowly
Browse files Browse the repository at this point in the history
Previously it always created the maximum number of connections all at once.
When the limit is large (e.g. 256/512), this can take seconds as it is also
multiplied by the number of nodes, further worsened by the bug(?) where even
good connections gets discarded if any of the connections have been closed.
  • Loading branch information
XA21X committed Jul 22, 2021
1 parent 7ab69f4 commit f0cb297
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions src/transforms/util/cluster_connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,26 @@ impl<C: Codec + 'static, A: Authenticator<T>, T: Token> ConnectionPool<C, A, T>
let mut lanes = self.lanes.lock().await;
let lane = lanes.entry(token.clone()).or_insert_with(HashMap::new);

if let Some(xs) = lane.get(&address) {
// TODO: Reuse connections and not let one bad apple spoil the batch?
if xs.iter().all(|x| !x.is_closed()) {
return Ok(xs.clone());
}
let mut connections: Vec<Connection> = lane
.get(&address)
.map(|existing_connections| {
existing_connections
.iter()
.filter(|connection| !connection.is_closed())
.take(connection_count)
.cloned()
})
.into_iter()
.flatten()
.collect();

let shortfall = connection_count - connections.len();

if shortfall > 0 {
// We need more connections, but let's create one at a time for now.
connections.append(&mut self.new_connections(&address, token, 1).await?);
}

let connections = self
.new_connections(&address, token, connection_count)
.await?;

// NOTE: This replaces the whole lane (i.e. disowns the old one).
// IDEA: Maintain weak references so the pool can track active count including disowned?
lane.insert(address, connections.clone());
Expand Down

0 comments on commit f0cb297

Please sign in to comment.