From b3ae5012eb3835cd8a9e386f98946056beb3ed3c Mon Sep 17 00:00:00 2001 From: Dawid Pawlik <501149991dp@gmail.com> Date: Sun, 12 Jan 2025 19:18:24 +0100 Subject: [PATCH] metrics: add connection metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement gathering of connection metrics like total number of active connections, connection timeouts and request timeouts. Co-authored-by: Wojciech Przytuła <59482568+wprzytula@users.noreply.github.com> --- scylla/src/client/session.rs | 10 ++++- scylla/src/cluster/metadata.rs | 11 ++++- scylla/src/cluster/node.rs | 3 ++ scylla/src/cluster/state.rs | 4 ++ scylla/src/cluster/worker.rs | 10 +++++ scylla/src/network/connection_pool.rs | 27 ++++++++++++ scylla/src/observability/metrics.rs | 43 +++++++++++++++++++ scylla/src/policies/load_balancing/default.rs | 3 ++ scylla/src/routing/locator/test.rs | 1 + 9 files changed, 109 insertions(+), 3 deletions(-) diff --git a/scylla/src/client/session.rs b/scylla/src/client/session.rs index 17b7807c6..9f1bf15b0 100644 --- a/scylla/src/client/session.rs +++ b/scylla/src/client/session.rs @@ -1005,6 +1005,8 @@ where keepalive_interval: config.keepalive_interval, }; + let metrics = Arc::new(Metrics::new()); + let cluster = Cluster::new( known_nodes, pool_config, @@ -1013,6 +1015,7 @@ where config.host_filter, config.cluster_metadata_refresh_interval, tablet_receiver, + metrics.clone(), ) .await?; @@ -1022,7 +1025,7 @@ where cluster, default_execution_profile_handle, schema_agreement_interval: config.schema_agreement_interval, - metrics: Arc::new(Metrics::new()), + metrics, schema_agreement_timeout: config.schema_agreement_timeout, schema_agreement_automatic_waiting: config.schema_agreement_automatic_waiting, refresh_metadata_on_auto_schema_agreement: config @@ -1985,7 +1988,10 @@ where Some(timeout) => tokio::time::timeout(timeout, runner) .await .map(|res| res.map_err(RequestError::from)) - .unwrap_or_else(|_| Err(RequestError::RequestTimeout(timeout))), + .unwrap_or_else(|_: tokio::time::error::Elapsed| { + self.metrics.inc_request_timeouts(); + Err(RequestError::RequestTimeout(timeout)) + }), None => runner.await.map_err(RequestError::from), }; diff --git a/scylla/src/cluster/metadata.rs b/scylla/src/cluster/metadata.rs index 904bcf967..2e8d28b72 100644 --- a/scylla/src/cluster/metadata.rs +++ b/scylla/src/cluster/metadata.rs @@ -22,6 +22,7 @@ use crate::deserialize::DeserializeOwnedRow; use crate::errors::{DbError, NewSessionError, QueryError, RequestAttemptError}; use crate::frame::response::event::Event; use crate::network::{Connection, ConnectionConfig, NodeConnectionPool, PoolConfig, PoolSize}; +use crate::observability::metrics::Metrics; use crate::policies::host_filter::HostFilter; use crate::routing::Token; use crate::statement::query::Query; @@ -82,6 +83,8 @@ pub(crate) struct MetadataReader { // When a control connection breaks, the PoolRefiller of its pool uses the requester // to signal ClusterWorker that an immediate metadata refresh is advisable. control_connection_repair_requester: broadcast::Sender<()>, + + metrics: Arc, } /// Describes all metadata retrieved from the cluster @@ -505,6 +508,7 @@ impl MetadataReader { keyspaces_to_fetch: Vec, fetch_schema: bool, host_filter: &Option>, + metrics: Arc, ) -> Result { let (initial_peers, resolved_hostnames) = resolve_contact_points(&initial_known_nodes).await; @@ -532,6 +536,7 @@ impl MetadataReader { connection_config.clone(), keepalive_interval, control_connection_repair_requester.clone(), + metrics.clone(), ); Ok(MetadataReader { @@ -548,6 +553,7 @@ impl MetadataReader { host_filter: host_filter.clone(), initial_known_nodes, control_connection_repair_requester, + metrics, }) } @@ -653,6 +659,7 @@ impl MetadataReader { self.connection_config.clone(), self.keepalive_interval, self.control_connection_repair_requester.clone(), + self.metrics.clone(), ); debug!( @@ -751,6 +758,7 @@ impl MetadataReader { self.connection_config.clone(), self.keepalive_interval, self.control_connection_repair_requester.clone(), + self.metrics.clone(), ); } } @@ -762,6 +770,7 @@ impl MetadataReader { connection_config: ConnectionConfig, keepalive_interval: Option, refresh_requester: broadcast::Sender<()>, + metrics: Arc, ) -> NodeConnectionPool { let pool_config = PoolConfig { connection_config, @@ -775,7 +784,7 @@ impl MetadataReader { can_use_shard_aware_port: false, }; - NodeConnectionPool::new(endpoint, pool_config, None, refresh_requester) + NodeConnectionPool::new(endpoint, pool_config, None, refresh_requester, metrics) } } diff --git a/scylla/src/cluster/node.rs b/scylla/src/cluster/node.rs index 808902c23..acffa670d 100644 --- a/scylla/src/cluster/node.rs +++ b/scylla/src/cluster/node.rs @@ -7,6 +7,7 @@ use crate::errors::{ConnectionPoolError, QueryError}; use crate::network::Connection; use crate::network::VerifiedKeyspaceName; use crate::network::{NodeConnectionPool, PoolConfig}; +use crate::observability::metrics::Metrics; /// Node represents a cluster node along with it's data and connections use crate::routing::{Shard, Sharder}; @@ -99,6 +100,7 @@ impl Node { pool_config: PoolConfig, keyspace_name: Option, enabled: bool, + metrics: Arc, ) -> Self { let host_id = peer.host_id; let address = peer.address; @@ -113,6 +115,7 @@ impl Node { pool_config, keyspace_name, pool_empty_notifier, + metrics, ) }); diff --git a/scylla/src/cluster/state.rs b/scylla/src/cluster/state.rs index 973d585b8..1fdc3b215 100644 --- a/scylla/src/cluster/state.rs +++ b/scylla/src/cluster/state.rs @@ -1,5 +1,6 @@ use crate::errors::{BadQuery, QueryError}; use crate::network::{Connection, PoolConfig, VerifiedKeyspaceName}; +use crate::observability::metrics::Metrics; use crate::policies::host_filter::HostFilter; use crate::prepared_statement::TokenCalculationError; use crate::routing::locator::tablets::{RawTablet, Tablet, TabletsInfo}; @@ -57,6 +58,7 @@ impl ClusterState { /// Creates new ClusterState using information about topology held in `metadata`. /// Uses provided `known_peers` hashmap to recycle nodes if possible. + #[allow(clippy::too_many_arguments)] pub(crate) async fn new( metadata: Metadata, pool_config: &PoolConfig, @@ -65,6 +67,7 @@ impl ClusterState { host_filter: Option<&dyn HostFilter>, mut tablets: TabletsInfo, old_keyspaces: &HashMap, + metrics: &Arc, ) -> Self { // Create new updated known_peers and ring let mut new_known_peers: HashMap> = @@ -99,6 +102,7 @@ impl ClusterState { pool_config.clone(), used_keyspace.clone(), is_enabled, + metrics.clone(), )) } }; diff --git a/scylla/src/cluster/worker.rs b/scylla/src/cluster/worker.rs index 3dab166fc..59e58b935 100644 --- a/scylla/src/cluster/worker.rs +++ b/scylla/src/cluster/worker.rs @@ -2,6 +2,7 @@ use crate::client::session::TABLET_CHANNEL_SIZE; use crate::errors::{NewSessionError, QueryError}; use crate::frame::response::event::{Event, StatusChangeEvent}; use crate::network::{PoolConfig, VerifiedKeyspaceName}; +use crate::observability::metrics::Metrics; use crate::policies::host_filter::HostFilter; use crate::routing::locator::tablets::{RawTablet, TabletsInfo}; @@ -91,6 +92,8 @@ struct ClusterWorker { // This value determines how frequently the cluster // worker will refresh the cluster metadata cluster_metadata_refresh_interval: Duration, + + metrics: Arc, } #[derive(Debug)] @@ -105,6 +108,7 @@ struct UseKeyspaceRequest { } impl Cluster { + #[allow(clippy::too_many_arguments)] pub(crate) async fn new( known_nodes: Vec, pool_config: PoolConfig, @@ -113,6 +117,7 @@ impl Cluster { host_filter: Option>, cluster_metadata_refresh_interval: Duration, tablet_receiver: tokio::sync::mpsc::Receiver<(TableSpec<'static>, RawTablet)>, + metrics: Arc, ) -> Result { let (refresh_sender, refresh_receiver) = tokio::sync::mpsc::channel(32); let (use_keyspace_sender, use_keyspace_receiver) = tokio::sync::mpsc::channel(32); @@ -129,6 +134,7 @@ impl Cluster { keyspaces_to_fetch, fetch_schema_metadata, &host_filter, + metrics.clone(), ) .await?; @@ -141,6 +147,7 @@ impl Cluster { host_filter.as_deref(), TabletsInfo::new(), &HashMap::new(), + &metrics, ) .await; cluster_data.wait_until_all_pools_are_initialized().await; @@ -163,6 +170,8 @@ impl Cluster { host_filter, cluster_metadata_refresh_interval, + + metrics, }; let (fut, worker_handle) = worker.work().remote_handle(); @@ -415,6 +424,7 @@ impl ClusterWorker { self.host_filter.as_deref(), cluster_data.locator.tablets.clone(), &cluster_data.keyspaces, + &self.metrics, ) .await, ); diff --git a/scylla/src/network/connection_pool.rs b/scylla/src/network/connection_pool.rs index 2390365f2..b431e7857 100644 --- a/scylla/src/network/connection_pool.rs +++ b/scylla/src/network/connection_pool.rs @@ -12,6 +12,8 @@ use crate::routing::{Shard, ShardCount, Sharder}; use crate::cluster::metadata::{PeerEndpoint, UntranslatedEndpoint}; +use crate::observability::metrics::Metrics; + #[cfg(feature = "cloud")] use crate::cluster::node::resolve_hostname; @@ -172,6 +174,7 @@ impl NodeConnectionPool { #[allow(unused_mut)] mut pool_config: PoolConfig, // `mut` needed only with "cloud" feature current_keyspace: Option, pool_empty_notifier: broadcast::Sender<()>, + metrics: Arc, ) -> Self { let (use_keyspace_request_sender, use_keyspace_request_receiver) = mpsc::channel(1); let pool_updated_notify = Arc::new(Notify::new()); @@ -209,6 +212,7 @@ impl NodeConnectionPool { current_keyspace, pool_updated_notify.clone(), pool_empty_notifier, + metrics, ); let conns = refiller.get_shared_connections(); @@ -476,6 +480,8 @@ struct PoolRefiller { // Signaled when the connection pool becomes empty pool_empty_notifier: broadcast::Sender<()>, + + metrics: Arc, } #[derive(Debug)] @@ -491,6 +497,7 @@ impl PoolRefiller { current_keyspace: Option, pool_updated_notify: Arc, pool_empty_notifier: broadcast::Sender<()>, + metrics: Arc, ) -> Self { // At the beginning, we assume the node does not have any shards // and assume that the node is a Cassandra node @@ -519,6 +526,8 @@ impl PoolRefiller { pool_updated_notify, pool_empty_notifier, + + metrics, } } @@ -922,6 +931,17 @@ impl PoolRefiller { // As this may may involve resolving a hostname, the whole operation is async. let endpoint_fut = self.maybe_translate_for_serverless(endpoint); + let count_in_metrics = { + let metrics = self.metrics.clone(); + move |connect_result: &Result<_, ConnectionError>| { + if connect_result.is_ok() { + metrics.inc_total_connections(); + } else if let Err(ConnectionError::ConnectTimeout) = &connect_result { + metrics.inc_connection_timeouts(); + } + } + }; + let fut = match (self.sharder.clone(), self.shard_aware_port, shard) { (Some(sharder), Some(port), Some(shard)) => async move { let shard_aware_endpoint = { @@ -936,6 +956,9 @@ impl PoolRefiller { &cfg, ) .await; + + count_in_metrics(&result); + OpenedConnectionEvent { result, requested_shard: Some(shard), @@ -946,6 +969,9 @@ impl PoolRefiller { _ => async move { let non_shard_aware_endpoint = endpoint_fut.await; let result = open_connection(non_shard_aware_endpoint, None, &cfg).await; + + count_in_metrics(&result); + OpenedConnectionEvent { result, requested_shard: None, @@ -1022,6 +1048,7 @@ impl PoolRefiller { match maybe_idx { Some(idx) => { v.swap_remove(idx); + self.metrics.dec_total_connections(); true } None => false, diff --git a/scylla/src/observability/metrics.rs b/scylla/src/observability/metrics.rs index 09c4a6433..9e74b9522 100644 --- a/scylla/src/observability/metrics.rs +++ b/scylla/src/observability/metrics.rs @@ -201,6 +201,9 @@ pub struct Metrics { retries_num: AtomicU64, histogram: Arc, meter: Arc, + total_connections: AtomicU64, + connection_timeouts: AtomicU64, + request_timeouts: AtomicU64, } impl Metrics { @@ -213,6 +216,9 @@ impl Metrics { retries_num: AtomicU64::new(0), histogram: Arc::new(LockFreeHistogram::default()), meter: Arc::new(Meter::new()), + total_connections: AtomicU64::new(0), + connection_timeouts: AtomicU64::new(0), + request_timeouts: AtomicU64::new(0), } } @@ -244,6 +250,28 @@ impl Metrics { self.retries_num.fetch_add(1, ORDER_TYPE); } + /// Increments counter for active number of connections to the cluster. + /// Should be called when opening new connections, once per connection. + pub(crate) fn inc_total_connections(&self) { + self.total_connections.fetch_add(1, ORDER_TYPE); + } + + /// Decrements counter for number of active connections to the cluster. + /// Should be called when closing the connections, once per connection. + pub(crate) fn dec_total_connections(&self) { + self.total_connections.fetch_sub(1, ORDER_TYPE); + } + + /// Increments counter for timeouts for new connections to the cluster. + pub(crate) fn inc_connection_timeouts(&self) { + self.connection_timeouts.fetch_add(1, ORDER_TYPE); + } + + /// Increments counter for client request timeouts. + pub(crate) fn inc_request_timeouts(&self) { + self.request_timeouts.fetch_add(1, ORDER_TYPE); + } + /// Saves to histogram latency of completing single query. /// For paged queries it should log latency for every page. /// @@ -345,6 +373,21 @@ impl Metrics { self.meter.fifteen_minute_rate() } + /// Returns total number of active connections + pub fn get_total_connections(&self) -> u64 { + self.total_connections.load(ORDER_TYPE) + } + + /// Returns counter for connection timeouts + pub fn get_connection_timeouts(&self) -> u64 { + self.connection_timeouts.load(ORDER_TYPE) + } + + /// Returns counter for request timeouts + pub fn get_request_timeouts(&self) -> u64 { + self.request_timeouts.load(ORDER_TYPE) + } + // Metric implementations fn mean(h: &Histogram) -> Result { diff --git a/scylla/src/policies/load_balancing/default.rs b/scylla/src/policies/load_balancing/default.rs index 13a2540eb..01d8415ef 100644 --- a/scylla/src/policies/load_balancing/default.rs +++ b/scylla/src/policies/load_balancing/default.rs @@ -1420,6 +1420,7 @@ mod tests { None, TabletsInfo::new(), &HashMap::new(), + &Default::default(), ) .await } @@ -1451,6 +1452,7 @@ mod tests { None, TabletsInfo::new(), &HashMap::new(), + &Default::default(), ) .await } @@ -2501,6 +2503,7 @@ mod tests { }, TabletsInfo::new(), &HashMap::new(), + &Default::default(), ) .await; diff --git a/scylla/src/routing/locator/test.rs b/scylla/src/routing/locator/test.rs index e0205c101..fce8b868e 100644 --- a/scylla/src/routing/locator/test.rs +++ b/scylla/src/routing/locator/test.rs @@ -187,6 +187,7 @@ pub(crate) fn create_ring(metadata: &Metadata) -> impl Iterator