Skip to content

Commit

Permalink
connection_pool: unified connection metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
wprzytula authored and QuerthDP committed Jan 27, 2025
1 parent 4997fc6 commit bcb3b77
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 20 deletions.
8 changes: 0 additions & 8 deletions scylla/src/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1905,21 +1905,13 @@ pub(super) async fn open_connection_to_shard_aware_port(
shard: Shard,
sharder: Sharder,
connection_config: &ConnectionConfig,
#[cfg(feature = "metrics")] metrics: &Metrics,
) -> Result<(Connection, ErrorReceiver), ConnectionError> {
// Create iterator over all possible source ports for this shard
let source_port_iter = sharder.iter_source_ports_for_shard(shard);

for port in source_port_iter {
let connect_result = open_connection(endpoint.clone(), Some(port), connection_config).await;

#[cfg(feature = "metrics")]
if connect_result.is_ok() {
metrics.inc_total_connections();
} else if let Err(ConnectionError::ConnectTimeout) = &connect_result {
metrics.inc_connection_timeouts();
}

match connect_result {
Err(err) if err.is_address_unavailable_for_use() => continue, // If we can't use this port, try the next one
result => return result,
Expand Down
27 changes: 15 additions & 12 deletions scylla/src/network/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,16 @@ impl PoolRefiller {
let endpoint_fut = self.maybe_translate_for_serverless(endpoint);

#[cfg(feature = "metrics")]
let metrics = self.metrics.clone();
let count_in_metrics = {
let metrics = self.metrics.clone();
move |connect_result: &Result<_, ConnectionError>| {
if connect_result.is_ok() {
metrics.inc_total_connections();
} else if let Err(ConnectionError::ConnectTimeout) = &connect_result {
metrics.inc_connection_timeouts();
}
}
};

let fut = match (self.sharder.clone(), self.shard_aware_port, shard) {
(Some(sharder), Some(port), Some(shard)) => async move {
Expand All @@ -949,10 +958,12 @@ impl PoolRefiller {
shard,
sharder.clone(),
&cfg,
#[cfg(feature = "metrics")]
&metrics,
)
.await;

#[cfg(feature = "metrics")]
count_in_metrics(&result);

OpenedConnectionEvent {
result,
requested_shard: Some(shard),
Expand All @@ -965,11 +976,7 @@ impl PoolRefiller {
let result = open_connection(non_shard_aware_endpoint, None, &cfg).await;

#[cfg(feature = "metrics")]
if result.is_ok() {
metrics.inc_total_connections();
} else if let Err(ConnectionError::ConnectTimeout) = &result {
metrics.inc_connection_timeouts();
}
count_in_metrics(&result);

OpenedConnectionEvent {
result,
Expand Down Expand Up @@ -1259,8 +1266,6 @@ mod tests {
// to the right shard
let sharder = Sharder::new(ShardCount::new(3).unwrap(), 12);

let metrics = Metrics::new();

// Open the connections
let mut conns = Vec::new();

Expand All @@ -1273,8 +1278,6 @@ mod tests {
0,
sharder.clone(),
&connection_config,
#[cfg(feature = "metrics")]
&metrics,
));
}

Expand Down

0 comments on commit bcb3b77

Please sign in to comment.