Skip to content

Commit

Permalink
Fix clippy warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
VianneyRuhlmann committed Aug 5, 2024
1 parent 6ea500f commit fabcd74
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 41 deletions.
23 changes: 7 additions & 16 deletions data-pipeline/src/concentrator/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ impl AggregationKey {
pub fn from_span(
span: &pb::Span,
peer_tags_aggregation: bool,
peer_tag_keys: &Vec<String>,
peer_tag_keys: &[String],
) -> Self {
let span_kind = span
.meta
.get(TAG_SPANKIND)
.map(|s| s.to_string())
.unwrap_or_default();
let peer_tags = if peer_tags_aggregation && client_or_producer(&span_kind) {
get_peer_tags(&span, peer_tag_keys)
get_peer_tags(span, peer_tag_keys)
} else {
vec![]
};
Expand Down Expand Up @@ -77,18 +77,15 @@ fn get_status_code(span: &pb::Span) -> u32 {

/// Return true if the span kind is "client" or "producer"
fn client_or_producer(span_kind: &str) -> bool {
match span_kind.to_lowercase().as_str() {
"client" | "producer" => true,
_ => false,
}
matches!(span_kind.to_lowercase().as_str(), "client" | "producer")
}

/// Parse the meta tags of a span and return a list of the peer tags based on the list of
/// `peer_tag_keys`
fn get_peer_tags(span: &pb::Span, peer_tag_keys: &Vec<String>) -> Vec<Tag> {
fn get_peer_tags(span: &pb::Span, peer_tag_keys: &[String]) -> Vec<Tag> {
peer_tag_keys
.iter()
.filter_map(|key| Some(Tag::new(key, span.meta.get(key)?).ok()?))
.filter_map(|key| Tag::new(key, span.meta.get(key)?).ok())
.collect()
}

Expand Down Expand Up @@ -141,10 +138,7 @@ impl StatsBucket {
/// Insert a value as stats in the group corresponding to the aggregation key, if it does
/// not exist it creates it.
pub fn insert(&mut self, key: AggregationKey, value: &pb::Span) {
self.data
.entry(key)
.or_insert(GroupedStats::default())
.insert(value);
self.data.entry(key).or_default().insert(value);
}

/// Consume the bucket and return a ClientStatsBucket containing the bucket stats.
Expand Down Expand Up @@ -490,10 +484,7 @@ mod tests {
];

for (span, expected_key) in test_cases {
assert_eq!(
AggregationKey::from_span(&span, false, &vec![]),
expected_key
);
assert_eq!(AggregationKey::from_span(&span, false, &[]), expected_key);
}

for (span, expected_key) in test_cases_with_peer_tags {
Expand Down
50 changes: 25 additions & 25 deletions data-pipeline/src/concentrator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0
//! This module implements the SpanConcentrator used to aggregate spans into stats
//! This module implements the Concentrator used to aggregate spans into stats
#![allow(dead_code)] // TODO: Remove once the trace exporter uses the concentrator
use std::collections::HashMap;
use std::time::{self, Duration, SystemTime};

Expand Down Expand Up @@ -29,12 +30,12 @@ fn align_timestamp(t: u64, bucket_size: u64) -> u64 {

/// Return true if the span has a span.kind that is eligible for stats computation
fn compute_stats_for_span_kind(span: &pb::Span) -> bool {
span.meta
.get("span.kind")
.is_some_and(|span_kind| match span_kind.to_lowercase().as_str() {
"server" | "consumer" | "client" | "producer" => true,
_ => false,
})
span.meta.get("span.kind").is_some_and(|span_kind| {
matches!(
span_kind.to_lowercase().as_str(),
"server" | "consumer" | "client" | "producer"
)
})
}

/// The concentrator compute stats on span aggregated by time and span attributes
Expand Down Expand Up @@ -137,9 +138,7 @@ impl Concentrator {
// if the tracer stops while the latest buckets aren't old enough to be flushed.
// The "force" boolean skips the delay and flushes all buckets, typically on agent
// shutdown.
if !force
&& timestamp
> (now_timestamp - self.buffer_len as u64 * self.bucket_size as u64)
if !force && timestamp > (now_timestamp - self.buffer_len as u64 * self.bucket_size)
{
self.buckets.insert(timestamp, bucket);
return None;
Expand All @@ -160,10 +159,11 @@ mod tests {

/// Return a random timestamp within the corresponding bucket (now - offset)
fn get_timestamp_in_bucket(aligned_now: u64, bucket_size: u64, offset: u64) -> u64 {
return aligned_now - bucket_size * offset + thread_rng().gen_range(0..BUCKET_SIZE);
aligned_now - bucket_size * offset + thread_rng().gen_range(0..BUCKET_SIZE)
}

/// Create a test span with given attributes
#[allow(clippy::too_many_arguments)]
fn get_test_span(
now: SystemTime,
span_id: u64,
Expand Down Expand Up @@ -192,6 +192,7 @@ mod tests {
}
}

#[allow(clippy::too_many_arguments)]
fn get_test_span_with_meta(
now: SystemTime,
span_id: u64,
Expand Down Expand Up @@ -300,7 +301,7 @@ mod tests {
is_trace_root: pb::Trilean::True.into(),
..Default::default()
}];
assert_counts_equal(expected, stats.get(0).unwrap().stats.clone());
assert_counts_equal(expected, stats.first().unwrap().stats.clone());
}

/// Test that the concentrator does not create buckets older than the exporter initialization
Expand Down Expand Up @@ -328,12 +329,12 @@ mod tests {
* concentrator.bucket_size;

for span in &spans {
concentrator.add_span(&span).expect("Failed to add span");
concentrator.add_span(span).expect("Failed to add span");
}

for _ in 0..(concentrator.buffer_len - 1) {
let stats = concentrator.flush(flushtime, false);
assert_eq!(stats.len(), 0, "We should get 0 time buckets");
assert!(stats.is_empty(), "We should get 0 time buckets");
flushtime += Duration::from_nanos(concentrator.bucket_size);
}

Expand All @@ -354,7 +355,7 @@ mod tests {
is_trace_root: pb::Trilean::True.into(),
..Default::default()
}];
assert_counts_equal(expected, stats.get(0).unwrap().stats.clone());
assert_counts_equal(expected, stats.first().unwrap().stats.clone());

flushtime += Duration::from_nanos(concentrator.bucket_size);
let stats = concentrator.flush(flushtime, false);
Expand All @@ -373,7 +374,7 @@ mod tests {
is_trace_root: pb::Trilean::True.into(),
..Default::default()
}];
assert_counts_equal(expected, stats.get(0).unwrap().stats.clone());
assert_counts_equal(expected, stats.first().unwrap().stats.clone());
}

/// TestConcentratorStatsTotals tests that the total stats are correct, independently of the
Expand Down Expand Up @@ -416,11 +417,11 @@ mod tests {

for _ in 0..=concentrator.buffer_len {
let stats = concentrator.flush(flushtime, false);
if stats.len() == 0 {
if stats.is_empty() {
continue;
}

for group in &stats.get(0).unwrap().stats {
for group in &stats.first().unwrap().stats {
total_duration += group.duration;
total_hits += group.hits;
total_errors += group.errors;
Expand Down Expand Up @@ -667,21 +668,20 @@ mod tests {
if expected_counts_by_time
.get(&expected_flushed_timestamps)
.expect("Unexpected flushed timestamps")
.len()
== 0
.is_empty()
{
// That's a flush for which we expect no data
continue;
}

assert_eq!(stats.len(), 1, "We should get exactly one bucket");
assert_eq!(expected_flushed_timestamps, stats.get(0).unwrap().start);
assert_eq!(expected_flushed_timestamps, stats.first().unwrap().start);
assert_counts_equal(
expected_counts_by_time
.get(&expected_flushed_timestamps)
.unwrap()
.clone(),
stats.get(0).unwrap().stats.clone(),
stats.first().unwrap().stats.clone(),
);

let stats = concentrator.flush(flushtime, false);
Expand Down Expand Up @@ -789,7 +789,7 @@ mod tests {
assert_counts_equal(
expected,
stats
.get(0)
.first()
.expect("There should be at least one time bucket")
.stats
.clone(),
Expand Down Expand Up @@ -1010,7 +1010,7 @@ mod tests {
assert_counts_equal(
expected_with_peer_tags,
stats_with_peer_tags
.get(0)
.first()
.expect("There should be at least one time bucket")
.stats
.clone(),
Expand All @@ -1020,7 +1020,7 @@ mod tests {
assert_counts_equal(
expected_without_peer_tags,
stats_without_peer_tags
.get(0)
.first()
.expect("There should be at least one time bucket")
.stats
.clone(),
Expand Down

0 comments on commit fabcd74

Please sign in to comment.