Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mep): Expose new tagging rules interface for metrics extracted from transactions [INGEST-947] [INGEST-541] #1225

Merged
merged 6 commits into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion relay-general/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use normalize::breakdowns::{
get_breakdown_measurements, BreakdownConfig, BreakdownsConfig, SpanOperationsConfig,
};
pub use normalize::normalize_dist;
pub use transactions::validate_timestamps;
pub use transactions::{get_measurement, validate_timestamps};

/// The config for store.
#[derive(Serialize, Deserialize, Debug, Default)]
Expand Down
8 changes: 8 additions & 0 deletions relay-general/src/store/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ use crate::types::{Annotated, Meta, ProcessingAction, ProcessingResult};
/// Rejects transactions based on required fields.
pub struct TransactionsProcessor;

/// Get the value for a measurement, e.g. lcp -> event.measurements.lcp
pub fn get_measurement(transaction: &Event, name: &str) -> Option<f64> {
let measurements = transaction.measurements.value()?;
let annotated = measurements.get(name)?;
let value = annotated.value().and_then(|m| m.value.value())?;
Some(*value)
}

/// Returns start and end timestamps if they are both set and start <= end.
pub fn validate_timestamps(
transaction_event: &Event,
Expand Down
88 changes: 84 additions & 4 deletions relay-sampling/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ use std::net::IpAddr;
use rand::{distributions::Uniform, Rng};
use rand_pcg::Pcg32;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_json::{Number, Value};

use relay_common::{EventType, ProjectKey, Uuid};
use relay_filter::GlobPatterns;
use relay_general::protocol::Event;
use relay_general::store::{get_measurement, validate_timestamps};

/// Defines the type of dynamic rule, i.e. to which type of events it will be applied and how.
#[derive(Debug, Copy, Clone, Serialize, Deserialize, Eq, PartialEq)]
Expand Down Expand Up @@ -115,6 +116,51 @@ impl EqCondition {
}
}

macro_rules! impl_cmp_condition {
($struct_name:ident, $operator:tt) => {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct $struct_name {
pub name: String,
pub value: Number,
}

impl $struct_name {
fn matches_event(&self, event: &Event) -> bool {
self.matches(event)
}
fn matches_trace(&self, trace: &TraceContext) -> bool {
self.matches(trace)
}

fn matches<T: FieldValueProvider>(&self, value_provider: &T) -> bool {
let value = match value_provider.get_value(self.name.as_str()) {
Value::Number(x) => x,
_ => return false
};

// Try various conversion functions in order of expensiveness and likelihood
// - as_i64 is not really fast, but most values in sampling rules can be i64, so we could
// return early
// - f64 is more likely to succeed than u64, but we might lose precision
if let (Some(a), Some(b)) = (value.as_i64(), self.value.as_i64()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be fine but tbh i'm not super confident. I'm trying to think of cases where both values don't have decimal points, but can't be cast to the same int type:

e.g.: self.value is negative, event value is u64::MAX (self.value can't be cast to u64, and event value can't be cast to i64)

but i think in those cases we should be fine with casting those values to floats for comparison...

a $operator b
} else if let (Some(a), Some(b)) = (value.as_u64(), self.value.as_u64()) {
a $operator b
} else if let (Some(a), Some(b)) = (value.as_f64(), self.value.as_f64()) {
a $operator b
} else {
false
}
}
}
}
}

impl_cmp_condition!(GteCondition, >=);
impl_cmp_condition!(LteCondition, <=);
impl_cmp_condition!(LtCondition, <);
impl_cmp_condition!(GtCondition, >);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, would there be a way to do this without a macro? Something like type GtCondition = CmpCondition<std::cmp::PartialOrd::gt>. I guess you would run into problems because you have three different operand types i64, u64, f64 inside?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find a way, you'd have to be generic over a type, and functions are either:

  • a singular type fn(a: i64, b: i64) -> bool (regular functions)
  • an unnameable type (closures)

there's no const generics for function pointers yet

In order to make generics truly work, you'd have to define a trait Comparator, implement it for a few zero-sized types Lt, Gt, ...:

trait Comparator {
    fn matches(a: Number, b: Number) -> bool;
}

struct Lt;
impl Comparator for Lt { ... }

struct CmpCondition<A> { ... }
impl<A: Comparator> CmpCondition<A> {
    fn matches(self, event: ...) -> bool {
        A::matches(self.number, get_event_number(event))
    }
}

type LtCondition = CmpCondition<Lt>;

but this seemed like more effort

I think instead of being generic over a type what you could do is to write a single non-generic type:

struct CmpCondition {
    cmp_fn: fn(a: Number, b: Number) -> bool;
    ...
}

but then you have to write custom deserialization logic, and serialization just won't work unless you keep the enum variants:

enum RuleCondition {
    Gte(CmpCondition),
    Lt(CmpCondition)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the rundown!


/// A condition that uses glob matching.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GlobCondition {
Expand Down Expand Up @@ -235,6 +281,10 @@ impl NotCondition {
#[serde(rename_all = "camelCase", tag = "op")]
pub enum RuleCondition {
Eq(EqCondition),
Gte(GteCondition),
Lte(LteCondition),
Lt(LtCondition),
Gt(GtCondition),
Glob(GlobCondition),
Or(OrCondition),
And(AndCondition),
Expand All @@ -252,17 +302,26 @@ impl RuleCondition {
match self {
RuleCondition::Unsupported => false,
// we have a known condition
RuleCondition::Eq(_) | RuleCondition::Glob(_) => true,
RuleCondition::Gte(_)
| RuleCondition::Lte(_)
| RuleCondition::Gt(_)
| RuleCondition::Lt(_)
| RuleCondition::Eq(_)
| RuleCondition::Glob(_) => true,
// dig down for embedded conditions
RuleCondition::And(rules) => rules.supported(),
RuleCondition::Or(rules) => rules.supported(),
RuleCondition::Not(rule) => rule.supported(),
RuleCondition::Custom(_) => true,
}
}
fn matches_event(&self, event: &Event, ip_addr: Option<IpAddr>) -> bool {
pub fn matches_event(&self, event: &Event, ip_addr: Option<IpAddr>) -> bool {
match self {
RuleCondition::Eq(condition) => condition.matches_event(event),
RuleCondition::Lte(condition) => condition.matches_event(event),
RuleCondition::Gte(condition) => condition.matches_event(event),
RuleCondition::Gt(condition) => condition.matches_event(event),
RuleCondition::Lt(condition) => condition.matches_event(event),
RuleCondition::Glob(condition) => condition.matches_event(event),
RuleCondition::And(conditions) => conditions.matches_event(event, ip_addr),
RuleCondition::Or(conditions) => conditions.matches_event(event, ip_addr),
Expand All @@ -271,9 +330,13 @@ impl RuleCondition {
RuleCondition::Custom(condition) => condition.matches_event(event, ip_addr),
}
}
fn matches_trace(&self, trace: &TraceContext, ip_addr: Option<IpAddr>) -> bool {
pub fn matches_trace(&self, trace: &TraceContext, ip_addr: Option<IpAddr>) -> bool {
match self {
RuleCondition::Eq(condition) => condition.matches_trace(trace),
RuleCondition::Gte(condition) => condition.matches_trace(trace),
RuleCondition::Lte(condition) => condition.matches_trace(trace),
RuleCondition::Gt(condition) => condition.matches_trace(trace),
RuleCondition::Lt(condition) => condition.matches_trace(trace),
RuleCondition::Glob(condition) => condition.matches_trace(trace),
RuleCondition::And(conditions) => conditions.matches_trace(trace, ip_addr),
RuleCondition::Or(conditions) => conditions.matches_trace(trace, ip_addr),
Expand Down Expand Up @@ -369,6 +432,23 @@ impl FieldValueProvider for Event {
None => Value::Null,
Some(s) => s.as_str().into(),
},
"transaction.duration" => match (self.ty.value(), validate_timestamps(self)) {
(Some(&EventType::Transaction), Ok((start, end))) => {
let start = start.timestamp_millis();
let end = end.timestamp_millis();

Value::Number(end.saturating_sub(start).into())
}
_ => Value::Null,
},
x if x.starts_with("transaction.measurements.") => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
x if x.starts_with("transaction.measurements.") => {
field_name if field_name.starts_with("transaction.measurements.") => {

let measurement_name = &x["transaction.measurements.".len()..];
if let Some(value) = get_measurement(self, measurement_name) {
value.into()
} else {
Value::Null
}
}
_ => Value::Null,
}
}
Expand Down
6 changes: 6 additions & 0 deletions relay-server/src/actors/envelopes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,11 @@ impl EnvelopeProcessor {
};

let breakdowns_config = state.project_state.config.breakdowns_v2.as_ref();
let conditional_tagging_config = state
.project_state
.config
.metric_conditional_tagging
.as_slice();

if let Some(event) = state.event.value() {
let extracted_anything;
Expand All @@ -1638,6 +1643,7 @@ impl EnvelopeProcessor {
extracted_anything = extract_transaction_metrics(
config,
breakdowns_config,
conditional_tagging_config,
event,
&mut state.extracted_metrics,
);
Expand Down
6 changes: 5 additions & 1 deletion relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::actors::project_cache::{
use crate::envelope::Envelope;
use crate::extractors::RequestMeta;
use crate::metrics_extraction::transactions::TransactionMetricsConfig;
use crate::metrics_extraction::TaggingRule;
use crate::statsd::RelayCounters;
use crate::utils::{EnvelopeLimiter, ErrorBoundary, Response};

Expand Down Expand Up @@ -90,12 +91,14 @@ pub struct ProjectConfig {
/// Configuration for operation breakdown. Will be emitted only if present.
#[serde(skip_serializing_if = "Option::is_none")]
pub breakdowns_v2: Option<BreakdownsConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
/// Configuration in relation to extracting metrics from transaction events.
#[serde(skip_serializing_if = "Option::is_none")]
pub transaction_metrics: Option<ErrorBoundary<TransactionMetricsConfig>>,
/// The span attributes configuration.
#[serde(skip_serializing_if = "BTreeSet::is_empty")]
pub span_attributes: BTreeSet<SpanAttribute>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub metric_conditional_tagging: Vec<TaggingRule>,
/// Exposable features enabled for this project
#[serde(skip_serializing_if = "BTreeSet::is_empty")]
pub features: BTreeSet<Feature>,
Expand All @@ -116,6 +119,7 @@ impl Default for ProjectConfig {
breakdowns_v2: None,
transaction_metrics: None,
span_attributes: BTreeSet::new(),
metric_conditional_tagging: Vec::new(),
features: BTreeSet::new(),
}
}
Expand Down
42 changes: 42 additions & 0 deletions relay-server/src/metrics_extraction/conditional_tagging.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::collections::BTreeSet;

use relay_sampling::RuleCondition;
use serde::{Deserialize, Serialize};
#[cfg(feature = "processing")]
use {relay_general::protocol::Event, relay_metrics::Metric};

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TaggingRule {
// note: could add relay_sampling::RuleType here, but right now we only support transaction
// events
pub condition: RuleCondition,
pub target_metrics: BTreeSet<String>,
pub target_tag: String,
pub tag_value: String,
}

#[cfg(feature = "processing")]
pub fn run_conditional_tagging(event: &Event, config: &[TaggingRule], metrics: &mut [Metric]) {
for rule in config {
if !rule.condition.supported()
|| rule.target_metrics.is_empty()
|| !rule.condition.matches_event(event, None)
{
continue;
}

// XXX(slow): this is a double-for-loop, but we extract like 6 metrics per transaction
for metric in &mut *metrics {
if !rule.target_metrics.contains(&metric.name)
|| metric.tags.contains_key(&rule.target_tag)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would add comment here to explain why we skip if the tag already exists.

{
continue;
}

metric
.tags
.insert(rule.target_tag.clone(), rule.tag_value.clone());
}
}
}
3 changes: 3 additions & 0 deletions relay-server/src/metrics_extraction/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
mod conditional_tagging;
pub mod sessions;
pub mod transactions;
mod utils;

pub use conditional_tagging::TaggingRule;
Loading