Skip to content

Commit

Permalink
Rename Concentrator to SpanConcentrator
Browse files Browse the repository at this point in the history
Change concentrator name to be consistent with the trace agent
  • Loading branch information
VianneyRuhlmann committed Aug 22, 2024
1 parent d198856 commit a517d6b
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 52 deletions.
2 changes: 1 addition & 1 deletion data-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

mod concentrator;
mod span_concentrator;
pub mod trace_exporter;
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0
//! This module implements the Concentrator used to aggregate spans into stats
#![allow(dead_code)] // TODO: Remove once the trace exporter uses the concentrator
//! This module implements the SpanConcentrator used to aggregate spans into stats
#![allow(dead_code)] // TODO: Remove once the trace exporter uses the SpanConcentrator
use std::collections::HashMap;
use std::time::{self, Duration, SystemTime};

use anyhow::{anyhow, Result};
use datadog_trace_protobuf::pb;
use datadog_trace_utils::trace_utils;

Expand Down Expand Up @@ -38,30 +37,38 @@ fn compute_stats_for_span_kind(span: &pb::Span) -> bool {
})
}

/// Return true if the span should be ignored for stats computation
fn should_ignore_span(span: &pb::Span, compute_stats_by_span_kind: bool) -> bool {
!(trace_utils::has_top_level(span)
|| trace_utils::is_measured(span)
|| (compute_stats_by_span_kind && compute_stats_for_span_kind(span)))
|| trace_utils::is_partial_snapshot(span)
}

/// The concentrator compute stats on span aggregated by time and span attributes
/// SpanConcentrator compute stats on span aggregated by time and span attributes
///
/// # Aggregation
/// Spans are aggregated into time buckets based on their end_time. Within each time bucket there
/// is another level of aggregation based on the spans fields (e.g. resource_name, service_name)
/// and the peer tags if the `peer_tags_aggregation` is enabled.
///
/// # Span eligibility
/// The ingested spans are only aggregated if they are root, top-level, measured or if their
/// `span.kind` is eligible and the `compute_stats_by_span_kind` is enabled.
///
/// # Flushing
/// When the SpanConcentrator is flushed it keeps the `buffer_len` most recent buckets and remove
/// all older buckets returning their content. When using force flush all buckets are flushed
/// regardless of their age.
#[derive(Debug)]
pub struct Concentrator {
pub struct SpanConcentrator {
/// Size of the time buckets used for aggregation in nanos
bucket_size: u64,
buckets: HashMap<u64, StatsBucket>,
/// Timestamp of the oldest time bucket for which we allow data.
/// Any ingested stats older than it get added to this bucket.
oldest_timestamp: u64,
/// bufferLen is the number of 10s stats bucket we keep in memory before flushing them.
/// It means that we can compute stats only for the last `bufferLen * bsize` and that we
/// wait such time before flushing the stats.
/// This only applies to past buckets. Stats buckets in the future are allowed with no
/// restriction.
/// bufferLen is the number stats bucket we keep when flushing.
buffer_len: usize,
/// flag to enable aggregation of peer tags
peer_tags_aggregation: bool,
Expand All @@ -71,17 +78,22 @@ pub struct Concentrator {
peer_tag_keys: Vec<String>,
}

impl Concentrator {
/// Return a new concentrator with the given parameter
/// - `bucket_size`
impl SpanConcentrator {
/// Return a new concentrator with the given parameters
/// - `bucket_size` is the size of the time buckets
/// - `now` the current system time, used to define the oldest bucket
/// - `peer_tags_aggregation` enables aggregation based on peer_tags
/// - `compute_stats_by_span_kind` use span_kind to determine span eligibility to stats
/// computation
/// - `peer_tags_keys` the list of keys considered as peer tags for aggregation
pub fn new(
bucket_size: Duration,
now: SystemTime,
peer_tags_aggregation: bool,
compute_stats_by_span_kind: bool,
peer_tag_keys: Vec<String>,
) -> Concentrator {
Concentrator {
) -> SpanConcentrator {
SpanConcentrator {
bucket_size: bucket_size.as_nanos() as u64,
buckets: HashMap::new(),
oldest_timestamp: align_timestamp(
Expand All @@ -95,12 +107,13 @@ impl Concentrator {
}
}

pub fn add_span(&mut self, span: &pb::Span) -> Result<()> {
if should_ignore_span(span, self.compute_stats_by_span_kind) {
return Ok(()); // Span is ignored
}
if let Ok(end_time) = u64::try_from(span.start + span.duration) {
let mut bucket_timestamp = align_timestamp(end_time, self.bucket_size);
/// Add a span into the concentrator, by computing stats if the span is elligible for stats
/// computation.
pub fn add_span(&mut self, span: &pb::Span) {
// If the span is elligible for stats computation
if !should_ignore_span(span, self.compute_stats_by_span_kind) {
let mut bucket_timestamp =
align_timestamp((span.start + span.duration) as u64, self.bucket_size);
// If the span is to old we aggregate it in the latest bucket instead of
// creating a new one
if bucket_timestamp < self.oldest_timestamp {
Expand All @@ -114,13 +127,11 @@ impl Concentrator {
.entry(bucket_timestamp)
.or_insert(StatsBucket::new(bucket_timestamp))
.insert(agg_key, span);

Ok(())
} else {
Err(anyhow!("Invalid span endtime"))
}
}

/// Flush all stats bucket except for the `buffer_len` most recent. If `force` is true, flush
/// all buckets.
pub fn flush(&mut self, now: SystemTime, force: bool) -> Vec<pb::ClientStatsBucket> {
// TODO: Use drain filter from hashbrown to avoid removing current buckets
let now_timestamp = system_time_to_unix_duration(now).as_nanos() as u64;
Expand All @@ -139,7 +150,7 @@ impl Concentrator {
// but we delay flushing by at most `bufferLen` buckets.
// This delay might result in not flushing stats payload (data loss)
// if the tracer stops while the latest buckets aren't old enough to be flushed.
// The "force" boolean skips the delay and flushes all buckets, typically on agent
// The "force" boolean skips the delay and flushes all buckets, typically on
// shutdown.
if !force && timestamp > (now_timestamp - self.buffer_len as u64 * self.bucket_size)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ fn assert_counts_equal(expected: Vec<pb::ClientGroupedStats>, actual: Vec<pb::Cl
assert_eq!(expected_map, actual_map)
}

/// Test that the concentrator does not create buckets older than the exporter initialization
/// Test that the SpanConcentrator does not create buckets older than the exporter initialization
#[test]
fn test_concentrator_oldest_timestamp_cold() {
let now = SystemTime::now();
let mut concentrator =
Concentrator::new(Duration::from_nanos(BUCKET_SIZE), now, false, false, vec![]);
SpanConcentrator::new(Duration::from_nanos(BUCKET_SIZE), now, false, false, vec![]);
let mut spans = vec![
get_test_span(now, 1, 0, 50, 5, "A1", "resource1", 0),
get_test_span(now, 1, 0, 40, 4, "A1", "resource1", 0),
Expand All @@ -100,7 +100,7 @@ fn test_concentrator_oldest_timestamp_cold() {
];
compute_top_level_span(spans.as_mut_slice());
for span in &spans {
concentrator.add_span(span).expect("Failed to add span");
concentrator.add_span(span);
}

let mut flushtime = now;
Expand Down Expand Up @@ -133,13 +133,13 @@ fn test_concentrator_oldest_timestamp_cold() {
assert_counts_equal(expected, stats.first().unwrap().stats.clone());
}

/// Test that the concentrator does not create buckets older than the exporter initialization
/// Test that the SpanConcentrator does not create buckets older than the exporter initialization
/// with multiple active buckets
#[test]
fn test_concentrator_oldest_timestamp_hot() {
let now = SystemTime::now();
let mut concentrator =
Concentrator::new(Duration::from_nanos(BUCKET_SIZE), now, false, false, vec![]);
SpanConcentrator::new(Duration::from_nanos(BUCKET_SIZE), now, false, false, vec![]);
let mut spans = vec![
get_test_span(now, 1, 0, 50, 5, "A1", "resource1", 0),
get_test_span(now, 1, 0, 40, 4, "A1", "resource1", 0),
Expand All @@ -158,7 +158,7 @@ fn test_concentrator_oldest_timestamp_hot() {
* concentrator.bucket_size;

for span in &spans {
concentrator.add_span(span).expect("Failed to add span");
concentrator.add_span(span);
}

for _ in 0..(concentrator.buffer_len - 1) {
Expand Down Expand Up @@ -206,13 +206,13 @@ fn test_concentrator_oldest_timestamp_hot() {
assert_counts_equal(expected, stats.first().unwrap().stats.clone());
}

/// TestConcentratorStatsTotals tests that the total stats are correct, independently of the
/// Tests that the total stats are correct, independently of the
/// time bucket they end up.
#[test]
fn test_concentrator_stats_totals() {
let now = SystemTime::now();
let mut concentrator =
Concentrator::new(Duration::from_nanos(BUCKET_SIZE), now, false, false, vec![]);
SpanConcentrator::new(Duration::from_nanos(BUCKET_SIZE), now, false, false, vec![]);
let aligned_now = align_timestamp(
system_time_to_unix_duration(now).as_nanos() as u64,
concentrator.bucket_size,
Expand All @@ -239,7 +239,7 @@ fn test_concentrator_stats_totals() {
let mut total_top_level_hits = 0;

for span in &spans {
concentrator.add_span(span).expect("Failed to add span");
concentrator.add_span(span);
}

let mut flushtime = now;
Expand Down Expand Up @@ -272,7 +272,7 @@ fn test_concentrator_stats_totals() {
fn test_concentrator_stats_counts() {
let now = SystemTime::now();
let mut concentrator =
Concentrator::new(Duration::from_nanos(BUCKET_SIZE), now, false, false, vec![]);
SpanConcentrator::new(Duration::from_nanos(BUCKET_SIZE), now, false, false, vec![]);
let aligned_now = align_timestamp(
system_time_to_unix_duration(now).as_nanos() as u64,
concentrator.bucket_size,
Expand Down Expand Up @@ -482,7 +482,7 @@ fn test_concentrator_stats_counts() {
compute_top_level_span(spans.as_mut_slice());

for span in &spans {
concentrator.add_span(span).expect("Failed to add span");
concentrator.add_span(span);
}

let mut flushtime = now;
Expand Down Expand Up @@ -563,9 +563,9 @@ fn test_span_should_be_included_in_stats() {
];
compute_top_level_span(spans.as_mut_slice());
let mut concentrator =
Concentrator::new(Duration::from_nanos(BUCKET_SIZE), now, false, true, vec![]);
SpanConcentrator::new(Duration::from_nanos(BUCKET_SIZE), now, false, true, vec![]);
for span in &spans {
concentrator.add_span(span).expect("Failed to add span");
concentrator.add_span(span);
}

let expected = vec![
Expand Down Expand Up @@ -637,9 +637,9 @@ fn test_ignore_partial_spans() {
.insert("_dd.partial_version".to_string(), 830604.0);
compute_top_level_span(spans.as_mut_slice());
let mut concentrator =
Concentrator::new(Duration::from_nanos(BUCKET_SIZE), now, false, true, vec![]);
SpanConcentrator::new(Duration::from_nanos(BUCKET_SIZE), now, false, true, vec![]);
for span in &spans {
concentrator.add_span(span).expect("Failed to add span");
concentrator.add_span(span);
}

let stats = concentrator.flush(
Expand All @@ -656,9 +656,9 @@ fn test_force_flush() {
let mut spans = vec![get_test_span(now, 1, 0, 50, 5, "A1", "resource1", 0)];
compute_top_level_span(spans.as_mut_slice());
let mut concentrator =
Concentrator::new(Duration::from_nanos(BUCKET_SIZE), now, false, true, vec![]);
SpanConcentrator::new(Duration::from_nanos(BUCKET_SIZE), now, false, true, vec![]);
for span in &spans {
concentrator.add_span(span).expect("Failed to add span");
concentrator.add_span(span);
}

// flushtime is 1h before now to make sure the bucket is not old enough to be flushed
Expand Down Expand Up @@ -733,23 +733,19 @@ fn test_peer_tags_aggregation() {
];
compute_top_level_span(spans.as_mut_slice());
let mut concentrator_without_peer_tags =
Concentrator::new(Duration::from_nanos(BUCKET_SIZE), now, true, true, vec![]);
let mut concentrator_with_peer_tags = Concentrator::new(
SpanConcentrator::new(Duration::from_nanos(BUCKET_SIZE), now, true, true, vec![]);
let mut concentrator_with_peer_tags = SpanConcentrator::new(
Duration::from_nanos(BUCKET_SIZE),
now,
true,
false,
vec!["db.instance".to_string(), "db.system".to_string()],
);
for span in &spans {
concentrator_without_peer_tags
.add_span(span)
.expect("Failed to add span");
concentrator_without_peer_tags.add_span(span);
}
for span in &spans {
concentrator_with_peer_tags
.add_span(span)
.expect("Failed to add span");
concentrator_with_peer_tags.add_span(span);
}

let flushtime = now
Expand Down

0 comments on commit a517d6b

Please sign in to comment.