Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make metrics collection optional/faster #1147

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ jobs:
run: cargo check --all-targets --manifest-path "scylla/Cargo.toml" --features "full-serialization"
- name: Cargo check with all features
run: cargo check --all-targets --manifest-path "scylla/Cargo.toml" --all-features
- name: Cargo check with metrics feature
run: cargo check --all-targets --manifest-path "scylla/Cargo.toml" --features "metrics"
- name: Cargo check with secrecy-08 feature
run: cargo check --all-targets --manifest-path "scylla/Cargo.toml" --features "secrecy-08"
- name: Cargo check with chrono-04 feature
Expand Down
7 changes: 5 additions & 2 deletions Cargo.lock.msrv

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 28 additions & 2 deletions docs/source/metrics/metrics.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Driver metrics

This feature is available only under the crate feature `metrics`.

During operation the driver collects various metrics.

They can be accessed at any moment using `Session::get_metrics()`
Expand All @@ -11,6 +13,9 @@ They can be accessed at any moment using `Session::get_metrics()`
* Total number of paged queries
* Number of errors during paged queries
* Number of retries
* Latency histogram statistics (min, max, mean, standard deviation, percentiles)
* Rates of queries per second in various time frames
* Number of active connections, and connection and request timeouts

### Example
```rust
Expand All @@ -24,11 +29,32 @@ println!("Queries requested: {}", metrics.get_queries_num());
println!("Iter queries requested: {}", metrics.get_queries_iter_num());
println!("Errors occurred: {}", metrics.get_errors_num());
println!("Iter errors occurred: {}", metrics.get_errors_iter_num());
println!("Average latency: {}", metrics.get_latency_avg_ms().unwrap());
println!("Average latency: {}", metrics.get_latency_avg_ms()?);
println!(
"99.9 latency percentile: {}",
metrics.get_latency_percentile_ms(99.9).unwrap()
metrics.get_latency_percentile_ms(99.9)?
);

let snapshot = metrics.get_snapshot()?;
println!("Min: {}", snapshot.min);
println!("Max: {}", snapshot.max);
println!("Mean: {}", snapshot.mean);
println!("Standard deviation: {}", snapshot.stddev);
println!("Median: {}", snapshot.median);
println!("75th percentile: {}", snapshot.percentile_75);
println!("95th percentile: {}", snapshot.percentile_95);
println!("98th percentile: {}", snapshot.percentile_98);
println!("99th percentile: {}", snapshot.percentile_99);
println!("99.9th percentile: {}", snapshot.percentile_99_9);

println!("Mean rate: {}", metrics.get_mean_rate());
println!("One minute rate: {}", metrics.get_one_minute_rate());
println!("Five minute rate: {}", metrics.get_five_minute_rate());
println!("Fifteen minute rate: {}", metrics.get_fifteen_minute_rate());

println!("Total connections: {}", metrics.get_total_connections());
println!("Connection timeouts: {}", metrics.get_connection_timeouts());
println!("Requests timeouts: {}", metrics.get_request_timeouts());
# Ok(())
# }
```
1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ scylla = { path = "../scylla", features = [
"num-bigint-03",
"num-bigint-04",
"bigdecimal-04",
"metrics",
QuerthDP marked this conversation as resolved.
Show resolved Hide resolved
] }
tokio = { version = "1.34", features = ["full"] }
tracing = { version = "0.1.25", features = ["log"] }
Expand Down
25 changes: 23 additions & 2 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,33 @@ async fn main() -> Result<()> {
println!("Iter queries requested: {}", metrics.get_queries_iter_num());
println!("Errors occurred: {}", metrics.get_errors_num());
println!("Iter errors occurred: {}", metrics.get_errors_iter_num());
println!("Average latency: {}", metrics.get_latency_avg_ms().unwrap());
println!("Average latency: {}", metrics.get_latency_avg_ms()?);
println!(
"99.9 latency percentile: {}",
metrics.get_latency_percentile_ms(99.9).unwrap()
metrics.get_latency_percentile_ms(99.9)?
);

let snapshot = metrics.get_snapshot()?;
println!("Min: {}", snapshot.min);
println!("Max: {}", snapshot.max);
println!("Mean: {}", snapshot.mean);
println!("Standard deviation: {}", snapshot.stddev);
println!("Median: {}", snapshot.median);
println!("75th percentile: {}", snapshot.percentile_75);
println!("95th percentile: {}", snapshot.percentile_95);
println!("98th percentile: {}", snapshot.percentile_98);
println!("99th percentile: {}", snapshot.percentile_99);
println!("99.9th percentile: {}", snapshot.percentile_99_9);

println!("Mean rate: {}", metrics.get_mean_rate());
println!("One minute rate: {}", metrics.get_one_minute_rate());
println!("Five minute rate: {}", metrics.get_five_minute_rate());
println!("Fifteen minute rate: {}", metrics.get_fifteen_minute_rate());

println!("Total connections: {}", metrics.get_total_connections());
println!("Connection timeouts: {}", metrics.get_connection_timeouts());
println!("Requests timeouts: {}", metrics.get_request_timeouts());

println!("Ok.");

Ok(())
Expand Down
3 changes: 2 additions & 1 deletion scylla/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ full-serialization = [
"num-bigint-04",
"bigdecimal-04",
]
metrics = ["dep:histogram"]

[dependencies]
scylla-macros = { version = "0.7.0", path = "../scylla-macros" }
Expand All @@ -47,7 +48,7 @@ byteorder = "1.3.4"
bytes = "1.0.1"
futures = "0.3.6"
hashbrown = "0.14"
histogram = "0.6.9"
histogram = { version = "0.11.1", optional = true }
tokio = { version = "1.34", features = [
"net",
"time",
Expand Down
13 changes: 12 additions & 1 deletion scylla/src/client/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::frame::response::result;
use crate::network::Connection;
use crate::observability::driver_tracing::RequestSpan;
use crate::observability::history::{self, HistoryListener};
#[cfg(feature = "metrics")]
use crate::observability::metrics::Metrics;
use crate::policies::load_balancing::{self, RoutingInfo};
use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession};
Expand Down Expand Up @@ -68,6 +69,7 @@ pub(crate) struct PreparedIteratorConfig {
pub(crate) values: SerializedValues,
pub(crate) execution_profile: Arc<ExecutionProfileInner>,
pub(crate) cluster_data: Arc<ClusterState>,
#[cfg(feature = "metrics")]
pub(crate) metrics: Arc<Metrics>,
}

Expand Down Expand Up @@ -141,6 +143,7 @@ struct PagerWorker<'a, QueryFunc, SpanCreatorFunc> {
query_consistency: Consistency,
retry_session: Box<dyn RetrySession>,
execution_profile: Arc<ExecutionProfileInner>,
#[cfg(feature = "metrics")]
metrics: Arc<Metrics>,

paging_state: PagingState,
Expand Down Expand Up @@ -239,11 +242,13 @@ where

match retry_decision {
RetryDecision::RetrySameNode(cl) => {
#[cfg(feature = "metrics")]
self.metrics.inc_retries_num();
current_consistency = cl.unwrap_or(current_consistency);
continue 'same_node_retries;
}
RetryDecision::RetryNextNode(cl) => {
#[cfg(feature = "metrics")]
self.metrics.inc_retries_num();
current_consistency = cl.unwrap_or(current_consistency);
continue 'nodes_in_plan;
Expand Down Expand Up @@ -303,6 +308,7 @@ where
node: NodeRef<'_>,
request_span: &RequestSpan,
) -> Result<ControlFlow<PageSendAttemptedProof, ()>, RequestAttemptError> {
#[cfg(feature = "metrics")]
self.metrics.inc_total_paged_queries();
let query_start = std::time::Instant::now();

Expand All @@ -328,6 +334,7 @@ where
tracing_id,
..
}) => {
#[cfg(feature = "metrics")]
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
self.log_attempt_success();
self.log_request_success();
Expand Down Expand Up @@ -363,6 +370,7 @@ where
Ok(ControlFlow::Continue(()))
}
Err(err) => {
#[cfg(feature = "metrics")]
self.metrics.inc_failed_paged_queries();
self.execution_profile
.load_balancing_policy
Expand All @@ -382,6 +390,7 @@ where
Ok(ControlFlow::Break(proof))
}
Ok(response) => {
#[cfg(feature = "metrics")]
self.metrics.inc_failed_paged_queries();
let err =
RequestAttemptError::UnexpectedResponse(response.response.to_response_kind());
Expand Down Expand Up @@ -686,7 +695,7 @@ impl QueryPager {
query: Query,
execution_profile: Arc<ExecutionProfileInner>,
cluster_data: Arc<ClusterState>,
metrics: Arc<Metrics>,
#[cfg(feature = "metrics")] metrics: Arc<Metrics>,
) -> Result<Self, NextRowError> {
let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);

Expand Down Expand Up @@ -749,6 +758,7 @@ impl QueryPager {
query_consistency: consistency,
retry_session,
execution_profile,
#[cfg(feature = "metrics")]
metrics,
paging_state: PagingState::start(),
history_listener: query.config.history_listener.clone(),
Expand Down Expand Up @@ -869,6 +879,7 @@ impl QueryPager {
query_consistency: consistency,
retry_session,
execution_profile: config.execution_profile,
#[cfg(feature = "metrics")]
metrics: config.metrics,
paging_state: PagingState::start(),
history_listener: config.prepared.config.history_listener.clone(),
Expand Down
Loading
Loading