Skip to content

Commit

Permalink
tests: replace string matching on metrics with parsing (#859)
Browse files Browse the repository at this point in the history
This branch changes the metrics integration tests so that we no longer
make assertions about metrics by searching the entire metrics scrape for
a specific string. Instead, we now have a simple builder API for
constructing a type that tries to parse the metrics scrape to find
individual metrics with certain label combinations and/or values.

Advantages of the new approach include:
- it's not ordering dependent --- changing the label ordering doesn't
  change whether or not the match succeeds. for example, changes
  like #856 won't break the tests.
- the presence of *additional* labels doesn't break the assertions
  (although we may want to add an "exact" matching mode in case we
  *do* want to assert that other labels are not present).
- metrics can easily be matched with and without values
- the failure reporting is *much* better. when the scrape doesn't
  contain the required metric, we can now report the metric we wanted
  in a nicely formatted way, and show only a list of the "similar"
  metrics that were found. with the previous code, panics included the
  entire metrics scrape, which was very hard to read and debug.

This should make it much easier to modify the tests to add new metric
labels or add existing ones, since we no longer depend on specific
hard-coded strings, but are instead making assertions about the
properties we actually want to find.

Example of the new errors:

```
thread 'tests::telemetry::response_classification::inbound_http' panicked at 'did not find a `response_total` metric
  with labels: ["authority=\"tele.test.svc.cluster.local\"", "direction=\"inbound\"", "tls=\"disabled\"", "status_code=\"200 OK\"", "classification=\"success\""]
found metrics: [
    "response_total{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",tls=\"disabled\",status_code=\"200\",classification=\"success\"} 1",
    "response_total{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",tls=\"disabled\",status_code=\"304\",classification=\"success\"} 1",
]
  retried 5 times (73.314652ms)', linkerd/app/integration/src/metrics.rs:128:51
stack backtrace:
    ... snip ...
```

These *could* probably be improved even more, for example, by
highlighting the specific label that was missing. However, this felt
like a pretty significant improvement for a first pass.

In a follow-up, I'll also reduce some of the code duplication between
the tests that are repeated for inbound and outbound. While working
on this change, I found that modifying the tests was much harder since
almost all of them were duplicated on the inbound and outbound halves
of the proxy. I'll merge that in a separate PR to reduce the size of
this diff, though.
  • Loading branch information
hawkw authored Jan 19, 2021
1 parent a5c06f2 commit ac6e5b2
Show file tree
Hide file tree
Showing 6 changed files with 704 additions and 385 deletions.
17 changes: 11 additions & 6 deletions linkerd/app/integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,24 @@ macro_rules! assert_contains {

#[macro_export]
macro_rules! assert_eventually_contains {
($scrape:expr, $contains:expr) => {
($scrape:expr, $contains:expr) => {{
let mut res: Result<(), crate::metrics::MatchErr> = Ok(());
let res_ref = &mut res;
assert_eventually!(
$scrape.contains($contains),
"metrics scrape:\n{}\ndid not contain:\n{}",
$scrape,
$contains
{
*res_ref = $contains.is_in($scrape);
res_ref.is_ok()
},
"{}",
std::mem::replace(res_ref, Ok(())).unwrap_err(),
)
};
}};
}

pub mod client;
pub mod controller;
pub mod identity;
pub mod metrics;
pub mod proxy;
pub mod server;
pub mod tap;
Expand Down
178 changes: 178 additions & 0 deletions linkerd/app/integration/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
use std::fmt;
use std::string::ToString;

#[derive(Debug, Clone)]
pub struct MetricMatch {
name: String,
labels: Vec<String>,
value: Option<String>,
}

#[derive(Debug, Clone, Default)]
pub struct Labels(Vec<String>);

pub fn metric(name: impl Into<String>) -> MetricMatch {
MetricMatch::new(name)
}

pub fn labels() -> Labels {
Labels::default()
}

#[derive(Eq, PartialEq)]
pub struct MatchErr(String);

impl MetricMatch {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
labels: Vec::new(),
value: None,
}
}

pub fn label(mut self, key: impl AsRef<str>, val: impl fmt::Display) -> Self {
self.labels.push(format!("{}=\"{}\"", key.as_ref(), val));
self
}

pub fn value(self, value: impl ToString) -> Self {
Self {
value: Some(value.to_string()),
..self
}
}

pub fn set_value(&mut self, value: impl ToString) -> &mut Self {
self.value = Some(value.to_string());
self
}

pub fn is_not_in(&self, scrape: impl AsRef<str>) -> bool {
self.is_in(scrape).is_err()
}

pub fn is_in(&self, scrape: impl AsRef<str>) -> Result<(), MatchErr> {
let scrape = scrape.as_ref();
let lines = scrape
.lines()
.filter(|&line| line.starts_with(&self.name))
.collect::<Vec<&str>>();
let mut candidates = Vec::new();
'lines: for &line in &lines {
if let Some(labels) = line
.split('{')
.nth(1)
.and_then(|line| line.split('}').next())
{
for label in &self.labels {
if !labels.contains(&label[..]) {
continue 'lines;
}
}
candidates.push(line);
}
}

if candidates.is_empty() {
return Err(MatchErr(format!(
"did not find a `{}` metric\n with labels: {:?}\nfound metrics: {:#?}",
self.name, self.labels, lines
)));
}

if let Some(ref expected_value) = self.value {
for &line in &candidates {
if let Some(value) = line.split('}').nth(1) {
let value = value.trim();
if value == expected_value {
return Ok(());
}
}
}
return Err(MatchErr(format!(
"did not find a `{}` metric\n with labels: {:?}\n and value: {}\nfound metrics: {:#?}",
self.name, self.labels, expected_value, candidates
)));
}

Ok(())
}

#[track_caller]
pub async fn assert_in(&self, client: &crate::client::Client) {
use std::str::FromStr;
use std::time::{Duration, Instant};
use std::{env, u64};
use tracing::Instrument as _;
const MAX_RETRIES: usize = 5;
// TODO: don't do this *every* time eventually is called (lazy_static?)
let patience = env::var(crate::ENV_TEST_PATIENCE_MS)
.ok()
.map(|s| {
let millis = u64::from_str(&s).expect(
"Could not parse RUST_TEST_PATIENCE_MS environment \
variable.",
);
Duration::from_millis(millis)
})
.unwrap_or(crate::DEFAULT_TEST_PATIENCE);
async {
let start_t = Instant::now();
for i in 1..=MAX_RETRIES {
tracing::info!(retries_remaining = MAX_RETRIES - i);
let scrape = client.get("/metrics").await;
match self.is_in(scrape) {
Ok(()) => break,
Err(e) if i == MAX_RETRIES => panic!(
"{}\n retried {} times ({:?})",
e,
MAX_RETRIES,
start_t.elapsed()
),
Err(_) => {
tracing::trace!("waiting...");
tokio::time::sleep(patience).await;
std::thread::yield_now();
tracing::trace!("done")
}
}
}
}
.instrument(tracing::trace_span!(
"assert_eventually",
patience = ?patience,
max_retries = MAX_RETRIES,
))
.await
}
}

impl Labels {
pub fn label(mut self, key: impl AsRef<str>, val: impl fmt::Display) -> Self {
self.0.push(format!("{}=\"{}\"", key.as_ref(), val));
self
}

pub fn metric(&self, name: impl Into<String>) -> MetricMatch {
MetricMatch {
labels: self.0.clone(),
name: name.into(),
value: None,
}
}
}

// === impl MatchErr ===

impl fmt::Debug for MatchErr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
}
}

impl fmt::Display for MatchErr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
}
}
17 changes: 11 additions & 6 deletions linkerd/app/integration/src/tests/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,18 @@ mod http2 {
tokio::task::yield_now().await;

// Wait until the proxy has seen the `srv1` disconnect...
assert_eventually_contains!(
metrics.get("/metrics").await,
&format!(
"tcp_close_total{{peer=\"dst\",authority=\"disco.test.svc.cluster.local:{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"\"}} 1",
port
metrics::metric("tcp_close_total")
.label("peer", "dst")
.label("direction", "outbound")
.label("tls", "no_identity")
.label("no_tls_reason", "not_provided_by_service_discovery")
.label(
"authority",
format_args!("disco.test.svc.cluster.local:{}", port),
)
);
.value(1u64)
.assert_in(&metrics)
.await;

// Start a new request to the destination, now that the server is dead.
// This request should be waiting at the balancer for a ready endpoint.
Expand Down
25 changes: 15 additions & 10 deletions linkerd/app/integration/src/tests/profiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,13 @@ async fn retry_uses_budget() {
assert_eq!(res.status(), 533);
},
with_metrics: |metrics: client::Client, port| async move {
let expected = format!(
"route_retryable_total{{direction=\"outbound\",dst=\"profiles.test.svc.cluster.local:{}\",skipped=\"no_budget\"}} 1",
port
);
assert_eventually_contains!(metrics.get("/metrics").await, &expected[..]);
metrics::metric("route_retryable_total")
.label("direction", "outbound")
.label("dst", format_args!("profiles.test.svc.cluster.local:{}", port))
.label("skipped", "no_budget")
.value(1u64)
.assert_in(&metrics)
.await;
}
}
}
Expand Down Expand Up @@ -332,11 +334,14 @@ async fn timeout() {
assert_eq!(res.status(), 504);
},
with_metrics: |metrics: client::Client, port| async move {
let expected = format!(
"route_response_total{{direction=\"outbound\",dst=\"profiles.test.svc.cluster.local:{}\",classification=\"failure\",error=\"timeout\"}} 1",
port
);
assert_eventually_contains!(metrics.get("/metrics").await, &expected[..]);
metrics::metric("route_response_total")
.label("direction", "outbound")
.label("dst", format_args!("profiles.test.svc.cluster.local:{}", port))
.label("classification", "failure")
.label("error", "timeout")
.value(1u64)
.assert_in(&metrics)
.await;
}
}
}
Loading

0 comments on commit ac6e5b2

Please sign in to comment.