Skip to content

Commit

Permalink
metrics: add connection metrics
Browse files Browse the repository at this point in the history
Implement gathering of connection metrics like total number
of active connections, connection timeouts and request timeouts.

Co-authored-by: Wojciech Przytuła <59482568+wprzytula@users.noreply.github.com>
  • Loading branch information
QuerthDP and wprzytula committed Jan 27, 2025
1 parent a0e3a9e commit b3ae501
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 3 deletions.
10 changes: 8 additions & 2 deletions scylla/src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,8 @@ where
keepalive_interval: config.keepalive_interval,
};

let metrics = Arc::new(Metrics::new());

let cluster = Cluster::new(
known_nodes,
pool_config,
Expand All @@ -1013,6 +1015,7 @@ where
config.host_filter,
config.cluster_metadata_refresh_interval,
tablet_receiver,
metrics.clone(),
)
.await?;

Expand All @@ -1022,7 +1025,7 @@ where
cluster,
default_execution_profile_handle,
schema_agreement_interval: config.schema_agreement_interval,
metrics: Arc::new(Metrics::new()),
metrics,
schema_agreement_timeout: config.schema_agreement_timeout,
schema_agreement_automatic_waiting: config.schema_agreement_automatic_waiting,
refresh_metadata_on_auto_schema_agreement: config
Expand Down Expand Up @@ -1985,7 +1988,10 @@ where
Some(timeout) => tokio::time::timeout(timeout, runner)
.await
.map(|res| res.map_err(RequestError::from))
.unwrap_or_else(|_| Err(RequestError::RequestTimeout(timeout))),
.unwrap_or_else(|_: tokio::time::error::Elapsed| {
self.metrics.inc_request_timeouts();
Err(RequestError::RequestTimeout(timeout))
}),
None => runner.await.map_err(RequestError::from),
};

Expand Down
11 changes: 10 additions & 1 deletion scylla/src/cluster/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::deserialize::DeserializeOwnedRow;
use crate::errors::{DbError, NewSessionError, QueryError, RequestAttemptError};
use crate::frame::response::event::Event;
use crate::network::{Connection, ConnectionConfig, NodeConnectionPool, PoolConfig, PoolSize};
use crate::observability::metrics::Metrics;
use crate::policies::host_filter::HostFilter;
use crate::routing::Token;
use crate::statement::query::Query;
Expand Down Expand Up @@ -82,6 +83,8 @@ pub(crate) struct MetadataReader {
// When a control connection breaks, the PoolRefiller of its pool uses the requester
// to signal ClusterWorker that an immediate metadata refresh is advisable.
control_connection_repair_requester: broadcast::Sender<()>,

metrics: Arc<Metrics>,
}

/// Describes all metadata retrieved from the cluster
Expand Down Expand Up @@ -505,6 +508,7 @@ impl MetadataReader {
keyspaces_to_fetch: Vec<String>,
fetch_schema: bool,
host_filter: &Option<Arc<dyn HostFilter>>,
metrics: Arc<Metrics>,
) -> Result<Self, NewSessionError> {
let (initial_peers, resolved_hostnames) =
resolve_contact_points(&initial_known_nodes).await;
Expand Down Expand Up @@ -532,6 +536,7 @@ impl MetadataReader {
connection_config.clone(),
keepalive_interval,
control_connection_repair_requester.clone(),
metrics.clone(),
);

Ok(MetadataReader {
Expand All @@ -548,6 +553,7 @@ impl MetadataReader {
host_filter: host_filter.clone(),
initial_known_nodes,
control_connection_repair_requester,
metrics,
})
}

Expand Down Expand Up @@ -653,6 +659,7 @@ impl MetadataReader {
self.connection_config.clone(),
self.keepalive_interval,
self.control_connection_repair_requester.clone(),
self.metrics.clone(),
);

debug!(
Expand Down Expand Up @@ -751,6 +758,7 @@ impl MetadataReader {
self.connection_config.clone(),
self.keepalive_interval,
self.control_connection_repair_requester.clone(),
self.metrics.clone(),
);
}
}
Expand All @@ -762,6 +770,7 @@ impl MetadataReader {
connection_config: ConnectionConfig,
keepalive_interval: Option<Duration>,
refresh_requester: broadcast::Sender<()>,
metrics: Arc<Metrics>,
) -> NodeConnectionPool {
let pool_config = PoolConfig {
connection_config,
Expand All @@ -775,7 +784,7 @@ impl MetadataReader {
can_use_shard_aware_port: false,
};

NodeConnectionPool::new(endpoint, pool_config, None, refresh_requester)
NodeConnectionPool::new(endpoint, pool_config, None, refresh_requester, metrics)
}
}

Expand Down
3 changes: 3 additions & 0 deletions scylla/src/cluster/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::errors::{ConnectionPoolError, QueryError};
use crate::network::Connection;
use crate::network::VerifiedKeyspaceName;
use crate::network::{NodeConnectionPool, PoolConfig};
use crate::observability::metrics::Metrics;
/// Node represents a cluster node along with it's data and connections
use crate::routing::{Shard, Sharder};

Expand Down Expand Up @@ -99,6 +100,7 @@ impl Node {
pool_config: PoolConfig,
keyspace_name: Option<VerifiedKeyspaceName>,
enabled: bool,
metrics: Arc<Metrics>,
) -> Self {
let host_id = peer.host_id;
let address = peer.address;
Expand All @@ -113,6 +115,7 @@ impl Node {
pool_config,
keyspace_name,
pool_empty_notifier,
metrics,
)
});

Expand Down
4 changes: 4 additions & 0 deletions scylla/src/cluster/state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::errors::{BadQuery, QueryError};
use crate::network::{Connection, PoolConfig, VerifiedKeyspaceName};
use crate::observability::metrics::Metrics;
use crate::policies::host_filter::HostFilter;
use crate::prepared_statement::TokenCalculationError;
use crate::routing::locator::tablets::{RawTablet, Tablet, TabletsInfo};
Expand Down Expand Up @@ -57,6 +58,7 @@ impl ClusterState {

/// Creates new ClusterState using information about topology held in `metadata`.
/// Uses provided `known_peers` hashmap to recycle nodes if possible.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn new(
metadata: Metadata,
pool_config: &PoolConfig,
Expand All @@ -65,6 +67,7 @@ impl ClusterState {
host_filter: Option<&dyn HostFilter>,
mut tablets: TabletsInfo,
old_keyspaces: &HashMap<String, Keyspace>,
metrics: &Arc<Metrics>,
) -> Self {
// Create new updated known_peers and ring
let mut new_known_peers: HashMap<Uuid, Arc<Node>> =
Expand Down Expand Up @@ -99,6 +102,7 @@ impl ClusterState {
pool_config.clone(),
used_keyspace.clone(),
is_enabled,
metrics.clone(),
))
}
};
Expand Down
10 changes: 10 additions & 0 deletions scylla/src/cluster/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::client::session::TABLET_CHANNEL_SIZE;
use crate::errors::{NewSessionError, QueryError};
use crate::frame::response::event::{Event, StatusChangeEvent};
use crate::network::{PoolConfig, VerifiedKeyspaceName};
use crate::observability::metrics::Metrics;
use crate::policies::host_filter::HostFilter;
use crate::routing::locator::tablets::{RawTablet, TabletsInfo};

Expand Down Expand Up @@ -91,6 +92,8 @@ struct ClusterWorker {
// This value determines how frequently the cluster
// worker will refresh the cluster metadata
cluster_metadata_refresh_interval: Duration,

metrics: Arc<Metrics>,
}

#[derive(Debug)]
Expand All @@ -105,6 +108,7 @@ struct UseKeyspaceRequest {
}

impl Cluster {
#[allow(clippy::too_many_arguments)]
pub(crate) async fn new(
known_nodes: Vec<InternalKnownNode>,
pool_config: PoolConfig,
Expand All @@ -113,6 +117,7 @@ impl Cluster {
host_filter: Option<Arc<dyn HostFilter>>,
cluster_metadata_refresh_interval: Duration,
tablet_receiver: tokio::sync::mpsc::Receiver<(TableSpec<'static>, RawTablet)>,
metrics: Arc<Metrics>,
) -> Result<Cluster, NewSessionError> {
let (refresh_sender, refresh_receiver) = tokio::sync::mpsc::channel(32);
let (use_keyspace_sender, use_keyspace_receiver) = tokio::sync::mpsc::channel(32);
Expand All @@ -129,6 +134,7 @@ impl Cluster {
keyspaces_to_fetch,
fetch_schema_metadata,
&host_filter,
metrics.clone(),
)
.await?;

Expand All @@ -141,6 +147,7 @@ impl Cluster {
host_filter.as_deref(),
TabletsInfo::new(),
&HashMap::new(),
&metrics,
)
.await;
cluster_data.wait_until_all_pools_are_initialized().await;
Expand All @@ -163,6 +170,8 @@ impl Cluster {

host_filter,
cluster_metadata_refresh_interval,

metrics,
};

let (fut, worker_handle) = worker.work().remote_handle();
Expand Down Expand Up @@ -415,6 +424,7 @@ impl ClusterWorker {
self.host_filter.as_deref(),
cluster_data.locator.tablets.clone(),
&cluster_data.keyspaces,
&self.metrics,
)
.await,
);
Expand Down
27 changes: 27 additions & 0 deletions scylla/src/network/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use crate::routing::{Shard, ShardCount, Sharder};

use crate::cluster::metadata::{PeerEndpoint, UntranslatedEndpoint};

use crate::observability::metrics::Metrics;

#[cfg(feature = "cloud")]
use crate::cluster::node::resolve_hostname;

Expand Down Expand Up @@ -172,6 +174,7 @@ impl NodeConnectionPool {
#[allow(unused_mut)] mut pool_config: PoolConfig, // `mut` needed only with "cloud" feature
current_keyspace: Option<VerifiedKeyspaceName>,
pool_empty_notifier: broadcast::Sender<()>,
metrics: Arc<Metrics>,
) -> Self {
let (use_keyspace_request_sender, use_keyspace_request_receiver) = mpsc::channel(1);
let pool_updated_notify = Arc::new(Notify::new());
Expand Down Expand Up @@ -209,6 +212,7 @@ impl NodeConnectionPool {
current_keyspace,
pool_updated_notify.clone(),
pool_empty_notifier,
metrics,
);

let conns = refiller.get_shared_connections();
Expand Down Expand Up @@ -476,6 +480,8 @@ struct PoolRefiller {

// Signaled when the connection pool becomes empty
pool_empty_notifier: broadcast::Sender<()>,

metrics: Arc<Metrics>,
}

#[derive(Debug)]
Expand All @@ -491,6 +497,7 @@ impl PoolRefiller {
current_keyspace: Option<VerifiedKeyspaceName>,
pool_updated_notify: Arc<Notify>,
pool_empty_notifier: broadcast::Sender<()>,
metrics: Arc<Metrics>,
) -> Self {
// At the beginning, we assume the node does not have any shards
// and assume that the node is a Cassandra node
Expand Down Expand Up @@ -519,6 +526,8 @@ impl PoolRefiller {

pool_updated_notify,
pool_empty_notifier,

metrics,
}
}

Expand Down Expand Up @@ -922,6 +931,17 @@ impl PoolRefiller {
// As this may may involve resolving a hostname, the whole operation is async.
let endpoint_fut = self.maybe_translate_for_serverless(endpoint);

let count_in_metrics = {
let metrics = self.metrics.clone();
move |connect_result: &Result<_, ConnectionError>| {
if connect_result.is_ok() {
metrics.inc_total_connections();
} else if let Err(ConnectionError::ConnectTimeout) = &connect_result {
metrics.inc_connection_timeouts();
}
}
};

let fut = match (self.sharder.clone(), self.shard_aware_port, shard) {
(Some(sharder), Some(port), Some(shard)) => async move {
let shard_aware_endpoint = {
Expand All @@ -936,6 +956,9 @@ impl PoolRefiller {
&cfg,
)
.await;

count_in_metrics(&result);

OpenedConnectionEvent {
result,
requested_shard: Some(shard),
Expand All @@ -946,6 +969,9 @@ impl PoolRefiller {
_ => async move {
let non_shard_aware_endpoint = endpoint_fut.await;
let result = open_connection(non_shard_aware_endpoint, None, &cfg).await;

count_in_metrics(&result);

OpenedConnectionEvent {
result,
requested_shard: None,
Expand Down Expand Up @@ -1022,6 +1048,7 @@ impl PoolRefiller {
match maybe_idx {
Some(idx) => {
v.swap_remove(idx);
self.metrics.dec_total_connections();
true
}
None => false,
Expand Down
Loading

0 comments on commit b3ae501

Please sign in to comment.