diff --git a/Cargo.lock b/Cargo.lock index 0dd55570dd..f58fb50797 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1307,6 +1307,7 @@ dependencies = [ "ddtelemetry", "dogstatsd-client", "either", + "hashbrown 0.15.1", "http", "http-body-util", "httpmock", diff --git a/data-pipeline/Cargo.toml b/data-pipeline/Cargo.toml index b3c2f50691..78e4c2402e 100644 --- a/data-pipeline/Cargo.toml +++ b/data-pipeline/Cargo.toml @@ -30,6 +30,7 @@ tokio = { version = "1.23", features = [ ], default-features = false } uuid = { version = "1.10.0", features = ["v4"] } tokio-util = "0.7.11" +hashbrown = { version = "0.15" } ddcommon = { path = "../ddcommon", default-features = false } ddtelemetry = { path = "../ddtelemetry", default-features = false } diff --git a/data-pipeline/src/span_concentrator/aggregation.rs b/data-pipeline/src/span_concentrator/aggregation.rs index a59bcbf4fb..ad75a65d37 100644 --- a/data-pipeline/src/span_concentrator/aggregation.rs +++ b/data-pipeline/src/span_concentrator/aggregation.rs @@ -7,34 +7,15 @@ use datadog_trace_protobuf::pb; use datadog_trace_utils::span::trace_utils; use datadog_trace_utils::span::Span; use datadog_trace_utils::span::SpanText; -use std::borrow::Borrow; -use std::borrow::Cow; -use std::collections::HashMap; +use hashbrown::HashMap; const TAG_STATUS_CODE: &str = "http.status_code"; const TAG_SYNTHETICS: &str = "synthetics"; const TAG_SPANKIND: &str = "span.kind"; const TAG_ORIGIN: &str = "_dd.origin"; -/// This struct represent the key used to group spans together to compute stats. -#[derive(Debug, Hash, PartialEq, Eq, Clone, Default)] -pub(super) struct AggregationKey<'a> { - resource_name: Cow<'a, str>, - service_name: Cow<'a, str>, - operation_name: Cow<'a, str>, - span_type: Cow<'a, str>, - span_kind: Cow<'a, str>, - http_status_code: u32, - is_synthetics_request: bool, - peer_tags: Vec<(Cow<'a, str>, Cow<'a, str>)>, - is_trace_root: bool, - http_method: Cow<'a, str>, - http_endpoint: Cow<'a, str>, -} - -/// Common representation of AggregationKey used to compare AggregationKey with different lifetimes -/// field order must be the same as in AggregationKey, o/wise hashes will be different #[derive(Clone, Hash, PartialEq, Eq)] +/// Represent a stats aggregation key borrowed from span data pub(super) struct BorrowedAggregationKey<'a> { resource_name: &'a str, service_name: &'a str, @@ -49,70 +30,73 @@ pub(super) struct BorrowedAggregationKey<'a> { http_endpoint: &'a str, } -/// Trait used to define a common type (`dyn BorrowableAggregationKey`) for all AggregationKey -/// regardless of lifetime. -/// This allows an hashmap with `AggregationKey<'static>` keys to lookup an entry with a -/// `AggregationKey<'a>`. -/// This is required because the `get_mut` method of Hashmap requires an input type `Q` such that -/// the key type `K` implements `Borrow`. Since `AggregationKey<'static>` cannot implement -/// `Borrow>` we use `dyn BorrowableAggregationKey` as a placeholder. -trait BorrowableAggregationKey { - fn borrowed_aggregation_key(&self) -> BorrowedAggregationKey<'_>; -} - -impl BorrowableAggregationKey for AggregationKey<'_> { - fn borrowed_aggregation_key(&self) -> BorrowedAggregationKey<'_> { - BorrowedAggregationKey { - resource_name: self.resource_name.borrow(), - service_name: self.service_name.borrow(), - operation_name: self.operation_name.borrow(), - span_type: self.span_type.borrow(), - span_kind: self.span_kind.borrow(), - http_status_code: self.http_status_code, - is_synthetics_request: self.is_synthetics_request, - peer_tags: self +impl hashbrown::Equivalent for BorrowedAggregationKey<'_> { + #[inline] + fn equivalent(&self, key: &OwnedAggregationKey) -> bool { + self.resource_name == key.resource_name + && self.service_name == key.service_name + && self.operation_name == key.operation_name + && self.span_type == key.span_type + && self.span_kind == key.span_kind + && self.http_status_code == key.http_status_code + && self.is_synthetics_request == key.is_synthetics_request + && self.peer_tags.len() == key.peer_tags.len() + && self .peer_tags .iter() - .map(|(tag, value)| (tag.borrow(), value.borrow())) - .collect(), - is_trace_root: self.is_trace_root, - http_method: self.http_method.borrow(), - http_endpoint: self.http_endpoint.borrow(), - } + .zip(key.peer_tags.iter()) + .all(|((k1, v1), (k2, v2))| k1 == k2 && v1 == v2) + && self.is_trace_root == key.is_trace_root + && self.http_method == key.http_method + && self.http_endpoint == key.http_endpoint } } -impl BorrowableAggregationKey for BorrowedAggregationKey<'_> { - fn borrowed_aggregation_key(&self) -> BorrowedAggregationKey<'_> { - self.clone() - } -} - -impl<'a, 'b> Borrow for AggregationKey<'a> -where - 'a: 'b, -{ - fn borrow(&self) -> &(dyn BorrowableAggregationKey + 'b) { - self - } -} - -impl Eq for dyn BorrowableAggregationKey + '_ {} - -impl PartialEq for dyn BorrowableAggregationKey + '_ { - fn eq(&self, other: &dyn BorrowableAggregationKey) -> bool { - self.borrowed_aggregation_key() - .eq(&other.borrowed_aggregation_key()) - } +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Default)] +/// Represents a span aggregation key with owned data +/// +/// To be able to use BorrowedAggregationKey to index into a stats bucket hashmap two +/// conditions must stay true: +/// * Hashing an owned key derived from a borrowed key should produce the same hash as hashing the +/// borrowed key +/// * Running the Equivalent trait on an owned key derived from a borrowed key should produce true +pub(super) struct OwnedAggregationKey { + resource_name: String, + service_name: String, + operation_name: String, + span_type: String, + span_kind: String, + http_status_code: u32, + is_synthetics_request: bool, + peer_tags: Vec<(String, String)>, + is_trace_root: bool, + http_method: String, + http_endpoint: String, } -impl std::hash::Hash for dyn BorrowableAggregationKey + '_ { - fn hash(&self, state: &mut H) { - self.borrowed_aggregation_key().hash(state) +impl From<&BorrowedAggregationKey<'_>> for OwnedAggregationKey { + fn from(value: &BorrowedAggregationKey<'_>) -> Self { + OwnedAggregationKey { + resource_name: value.resource_name.to_owned(), + service_name: value.service_name.to_owned(), + operation_name: value.operation_name.to_owned(), + span_type: value.span_type.to_owned(), + span_kind: value.span_kind.to_owned(), + http_status_code: value.http_status_code, + is_synthetics_request: value.is_synthetics_request, + peer_tags: value + .peer_tags + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + is_trace_root: value.is_trace_root, + http_method: value.http_method.to_owned(), + http_endpoint: value.http_endpoint.to_owned(), + } } } -impl<'a> AggregationKey<'a> { +impl<'a> BorrowedAggregationKey<'a> { /// Return an AggregationKey matching the given span. /// /// If `peer_tags_keys` is not empty then the peer tags of the span will be included in the @@ -146,57 +130,32 @@ impl<'a> AggregationKey<'a> { .unwrap_or_default(); Self { - resource_name: span.resource.borrow().into(), - service_name: span.service.borrow().into(), - operation_name: span.name.borrow().into(), - span_type: span.r#type.borrow().into(), - span_kind: span_kind.into(), + resource_name: span.resource.borrow(), + service_name: span.service.borrow(), + operation_name: span.name.borrow(), + span_type: span.r#type.borrow(), + span_kind, http_status_code: get_status_code(span), - http_method: http_method.into(), - http_endpoint: http_endpoint.into(), is_synthetics_request: span .meta .get(TAG_ORIGIN) .is_some_and(|origin| origin.borrow().starts_with(TAG_SYNTHETICS)), + peer_tags, is_trace_root: span.parent_id == 0, - peer_tags: peer_tags - .into_iter() - .map(|(k, v)| (k.into(), v.borrow().into())) - .collect(), - } - } - - /// Clone the fields of an AggregationKey to produce a static version of the key which is - /// not tied to the lifetime of a span. - pub(super) fn into_static_key(self) -> AggregationKey<'static> { - AggregationKey { - resource_name: Cow::Owned(self.resource_name.into_owned()), - service_name: Cow::Owned(self.service_name.into_owned()), - operation_name: Cow::Owned(self.operation_name.into_owned()), - span_type: Cow::Owned(self.span_type.into_owned()), - span_kind: Cow::Owned(self.span_kind.into_owned()), - http_status_code: self.http_status_code, - http_method: Cow::Owned(self.http_method.into_owned()), - http_endpoint: Cow::Owned(self.http_endpoint.into_owned()), - is_synthetics_request: self.is_synthetics_request, - is_trace_root: self.is_trace_root, - peer_tags: self - .peer_tags - .into_iter() - .map(|(key, value)| (Cow::from(key.into_owned()), Cow::from(value.into_owned()))) - .collect(), + http_method, + http_endpoint, } } } -impl From for AggregationKey<'static> { +impl From for OwnedAggregationKey { fn from(value: pb::ClientGroupedStats) -> Self { Self { - resource_name: value.resource.into(), - service_name: value.service.into(), - operation_name: value.name.into(), - span_type: value.r#type.into(), - span_kind: value.span_kind.into(), + resource_name: value.resource, + service_name: value.service, + operation_name: value.name, + span_type: value.r#type, + span_kind: value.span_kind, http_status_code: value.http_status_code, is_synthetics_request: value.synthetics, peer_tags: value @@ -204,12 +163,12 @@ impl From for AggregationKey<'static> { .into_iter() .filter_map(|t| { let (key, value) = t.split_once(':')?; - Some((key.to_string().into(), value.to_string().into())) + Some((key.to_string(), value.to_string())) }) .collect(), is_trace_root: value.is_trace_root == 1, - http_method: value.http_method.into(), - http_endpoint: value.http_endpoint.into(), + http_method: value.http_method, + http_endpoint: value.http_endpoint, } } } @@ -241,13 +200,16 @@ where /// Parse the meta tags of a span and return a list of the peer tags based on the list of /// `peer_tag_keys` -fn get_peer_tags<'k, 'v, T>(span: &'v Span, peer_tag_keys: &'k [String]) -> Vec<(&'k str, &'v T)> +fn get_peer_tags<'k, 'v, T>( + span: &'v Span, + peer_tag_keys: &'k [String], +) -> Vec<(&'k str, &'v str)> where T: SpanText, { peer_tag_keys .iter() - .filter_map(|key| Some((key.as_str(), span.meta.get(key.as_str())?))) + .filter_map(|key| Some(((key.as_str()), (span.meta.get(key.as_str())?.borrow())))) .collect() } @@ -287,7 +249,7 @@ impl GroupedStats { /// spans aggregated on their AggregationKey. #[derive(Debug, Clone)] pub(super) struct StatsBucket { - data: HashMap, GroupedStats>, + data: HashMap, start: u64, } @@ -302,17 +264,11 @@ impl StatsBucket { /// Insert a value as stats in the group corresponding to the aggregation key, if it does /// not exist it creates it. - pub(super) fn insert(&mut self, key: AggregationKey<'_>, value: &Span) + pub(super) fn insert(&mut self, key: BorrowedAggregationKey<'_>, value: &Span) where T: SpanText, { - if let Some(grouped_stats) = self.data.get_mut(&key as &dyn BorrowableAggregationKey) { - grouped_stats.insert(value); - } else { - let mut grouped_stats = GroupedStats::default(); - grouped_stats.insert(value); - self.data.insert(key.into_static_key(), grouped_stats); - } + self.data.entry_ref(&key).or_default().insert(value); } /// Consume the bucket and return a ClientStatsBucket containing the bucket stats. @@ -333,13 +289,13 @@ impl StatsBucket { } /// Create a ClientGroupedStats struct based on the given AggregationKey and GroupedStats -fn encode_grouped_stats(key: AggregationKey, group: GroupedStats) -> pb::ClientGroupedStats { +fn encode_grouped_stats(key: OwnedAggregationKey, group: GroupedStats) -> pb::ClientGroupedStats { pb::ClientGroupedStats { - service: key.service_name.into_owned(), - name: key.operation_name.into_owned(), - resource: key.resource_name.into_owned(), + service: key.service_name, + name: key.operation_name, + resource: key.resource_name, http_status_code: key.http_status_code, - r#type: key.span_type.into_owned(), + r#type: key.span_type, db_type: String::new(), // db_type is not used yet (see proto definition) hits: group.hits, @@ -350,7 +306,7 @@ fn encode_grouped_stats(key: AggregationKey, group: GroupedStats) -> pb::ClientG error_summary: group.error_summary.encode_to_vec(), synthetics: key.is_synthetics_request, top_level_hits: group.top_level_hits, - span_kind: key.span_kind.into_owned(), + span_kind: key.span_kind, peer_tags: key .peer_tags @@ -362,8 +318,8 @@ fn encode_grouped_stats(key: AggregationKey, group: GroupedStats) -> pb::ClientG } else { pb::Trilean::False.into() }, - http_method: key.http_method.into_owned(), - http_endpoint: key.http_endpoint.into_owned(), + http_method: key.http_method, + http_endpoint: key.http_endpoint, grpc_status_code: String::new(), // currently not used } } @@ -373,10 +329,18 @@ mod tests { use datadog_trace_utils::span::{SpanBytes, SpanSlice}; use super::*; + use std::{collections::HashMap, hash::Hash}; + + fn get_hash(v: &impl Hash) -> u64 { + use std::hash::Hasher; + let mut hasher = std::hash::DefaultHasher::new(); + v.hash(&mut hasher); + hasher.finish() + } #[test] fn test_aggregation_key_from_span() { - let test_cases: Vec<(SpanBytes, AggregationKey)> = vec![ + let test_cases: Vec<(SpanBytes, OwnedAggregationKey)> = vec![ // Root span ( SpanBytes { @@ -387,7 +351,7 @@ mod tests { parent_id: 0, ..Default::default() }, - AggregationKey { + OwnedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), @@ -406,7 +370,7 @@ mod tests { meta: HashMap::from([("span.kind".into(), "client".into())]), ..Default::default() }, - AggregationKey { + OwnedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), @@ -429,7 +393,7 @@ mod tests { ]), ..Default::default() }, - AggregationKey { + OwnedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), @@ -454,7 +418,7 @@ mod tests { ]), ..Default::default() }, - AggregationKey { + OwnedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), @@ -480,7 +444,7 @@ mod tests { ]), ..Default::default() }, - AggregationKey { + OwnedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), @@ -500,7 +464,7 @@ mod tests { meta: HashMap::from([("_dd.origin".into(), "synthetics-browser".into())]), ..Default::default() }, - AggregationKey { + OwnedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), @@ -520,7 +484,7 @@ mod tests { meta: HashMap::from([("http.status_code".into(), "418".into())]), ..Default::default() }, - AggregationKey { + OwnedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), @@ -541,7 +505,7 @@ mod tests { meta: HashMap::from([("http.status_code".into(), "x".into())]), ..Default::default() }, - AggregationKey { + OwnedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), @@ -561,7 +525,7 @@ mod tests { metrics: HashMap::from([("http.status_code".into(), 418.0)]), ..Default::default() }, - AggregationKey { + OwnedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), @@ -585,7 +549,7 @@ mod tests { ]), ..Default::default() }, - AggregationKey { + OwnedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "GET /api/v1/users".into(), @@ -611,7 +575,7 @@ mod tests { ]), ..Default::default() }, - AggregationKey { + OwnedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "POST /users/create".into(), @@ -630,7 +594,7 @@ mod tests { "db.system".to_string(), ]; - let test_cases_with_peer_tags: Vec<(SpanSlice, AggregationKey)> = vec![ + let test_cases_with_peer_tags: Vec<(SpanSlice, OwnedAggregationKey)> = vec![ // Span with peer tags with peertags aggregation enabled ( SpanSlice { @@ -642,7 +606,7 @@ mod tests { meta: HashMap::from([("span.kind", "client"), ("aws.s3.bucket", "bucket-a")]), ..Default::default() }, - AggregationKey { + OwnedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), @@ -668,7 +632,7 @@ mod tests { ]), ..Default::default() }, - AggregationKey { + OwnedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), @@ -699,7 +663,7 @@ mod tests { ]), ..Default::default() }, - AggregationKey { + OwnedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), @@ -711,13 +675,20 @@ mod tests { ]; for (span, expected_key) in test_cases { - assert_eq!(AggregationKey::from_span(&span, &[]), expected_key); + let borrowed_key = BorrowedAggregationKey::from_span(&span, &[]); + assert_eq!(OwnedAggregationKey::from(&borrowed_key), expected_key); + assert_eq!( + get_hash(&borrowed_key), + get_hash(&OwnedAggregationKey::from(&borrowed_key)) + ); } for (span, expected_key) in test_cases_with_peer_tags { + let borrowed_key = BorrowedAggregationKey::from_span(&span, test_peer_tags.as_slice()); + assert_eq!(OwnedAggregationKey::from(&borrowed_key), expected_key); assert_eq!( - AggregationKey::from_span(&span, test_peer_tags.as_slice()), - expected_key + get_hash(&borrowed_key), + get_hash(&OwnedAggregationKey::from(&borrowed_key)) ); } } diff --git a/data-pipeline/src/span_concentrator/mod.rs b/data-pipeline/src/span_concentrator/mod.rs index 7f049c3b51..02332d7b5b 100644 --- a/data-pipeline/src/span_concentrator/mod.rs +++ b/data-pipeline/src/span_concentrator/mod.rs @@ -7,7 +7,7 @@ use std::time::{self, Duration, SystemTime}; use datadog_trace_protobuf::pb; use datadog_trace_utils::span::{trace_utils, Span, SpanText}; -use aggregation::{AggregationKey, StatsBucket}; +use aggregation::{BorrowedAggregationKey, StatsBucket}; mod aggregation; @@ -133,7 +133,7 @@ impl SpanConcentrator { bucket_timestamp = self.oldest_timestamp; } - let agg_key = AggregationKey::from_span(span, self.peer_tag_keys.as_slice()); + let agg_key = BorrowedAggregationKey::from_span(span, self.peer_tag_keys.as_slice()); self.buckets .entry(bucket_timestamp) diff --git a/data-pipeline/src/span_concentrator/tests.rs b/data-pipeline/src/span_concentrator/tests.rs index 7cd52ae724..318a2780fe 100644 --- a/data-pipeline/src/span_concentrator/tests.rs +++ b/data-pipeline/src/span_concentrator/tests.rs @@ -1,6 +1,8 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use crate::span_concentrator::aggregation::OwnedAggregationKey; + use super::*; use datadog_trace_utils::span::{trace_utils::compute_top_level_span, SpanSlice}; use rand::{thread_rng, Rng}; @@ -83,12 +85,12 @@ fn assert_counts_equal(expected: Vec, actual: Vec