Skip to content

Commit

Permalink
produce outcomes for metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Nov 17, 2023
1 parent 3db0b08 commit 43d469b
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 24 deletions.
51 changes: 40 additions & 11 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,19 @@ use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::actors::project::ProjectState;
use crate::actors::project_cache::ProjectCache;
use crate::actors::upstream::{SendRequest, UpstreamRelay};
use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType};
use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType, SourceQuantities};
use crate::extractors::{PartialDsn, RequestMeta};
use crate::metrics_extraction::transactions::types::ExtractMetricsError;
use crate::metrics_extraction::transactions::{ExtractedMetrics, TransactionExtractor};
use crate::service::ServiceError;
use crate::statsd::{PlatformTag, RelayCounters, RelayHistograms, RelayTimers};
use crate::utils::{
self, ChunkedFormDataAggregator, ExtractionMode, FormDataIter, ItemAction, ManagedEnvelope,
SamplingResult,
self, extract_transaction_count, ChunkedFormDataAggregator, ExtractionMode, FormDataIter,
ItemAction, ManagedEnvelope, SamplingResult,
};

use super::test_store::TestStore;

/// The minimum clock drift for correction to apply.
const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);

Expand Down Expand Up @@ -580,6 +582,7 @@ struct InnerProcessor {
#[cfg(feature = "processing")]
aggregator: Addr<Aggregator>,
upstream_relay: Addr<UpstreamRelay>,
test_store: Addr<TestStore>,
#[cfg(feature = "processing")]
rate_limiter: Option<RedisRateLimiter>,
geoip_lookup: Option<GeoIpLookup>,
Expand All @@ -596,6 +599,7 @@ impl EnvelopeProcessorService {
project_cache: Addr<ProjectCache>,
global_config: Addr<GlobalConfigManager>,
upstream_relay: Addr<UpstreamRelay>,
test_store: Addr<TestStore>,
#[cfg(feature = "processing")] aggregator: Addr<Aggregator>,
) -> Self {
let geoip_lookup = config.geoip_path().and_then(|p| {
Expand All @@ -620,6 +624,7 @@ impl EnvelopeProcessorService {
global_config,
outcome_aggregator,
upstream_relay,
test_store,
geoip_lookup,
#[cfg(feature = "processing")]
aggregator,
Expand Down Expand Up @@ -3094,12 +3099,14 @@ impl EnvelopeProcessorService {
let mut num_batches = 0;

for batch in BucketsView::new(&buckets).by_size(max_batch_size_bytes) {
#[allow(clippy::redundant_closure_call)]
let mut envelope: ManagedEnvelope = (move || {
let _ = extraction_mode;
let _ = dsn;
todo!("Added by next commit in the PR")
})();
let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
envelope.add_item(create_metrics_item(&batch, extraction_mode));

let mut envelope = ManagedEnvelope::standalone(
envelope,
self.inner.outcome_aggregator.clone(),
self.inner.test_store.clone(),
);
envelope.set_partition_key(partition_key).scope(scoping);

relay_statsd::metric!(
Expand Down Expand Up @@ -3181,6 +3188,26 @@ impl Service for EnvelopeProcessorService {
}
}

fn create_metrics_item(buckets: &BucketsView<'_>, extraction_mode: ExtractionMode) -> Item {
let source_quantities = buckets
.iter()
.filter_map(|bucket| extract_transaction_count(&bucket, extraction_mode))
.fold(SourceQuantities::default(), |acc, c| {
let profile_count = if c.has_profile { c.count } else { 0 };

SourceQuantities {
transactions: acc.transactions + c.count,
profiles: acc.profiles + profile_count,
}
});

let mut item = Item::new(ItemType::MetricBuckets);
item.set_source_quantities(source_quantities);
item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());

item
}

#[cfg(test)]
mod tests {
use std::env;
Expand Down Expand Up @@ -3616,22 +3643,24 @@ mod tests {
let (project_cache, _) = mock_service("project_cache", (), |&mut (), _| {});
let (upstream_relay, _) = mock_service("upstream_relay", (), |&mut (), _| {});
let (global_config, _) = mock_service("global_config", (), |&mut (), _| {});
let (test_store, _) = mock_service("test_store", (), |&mut (), _| {});
#[cfg(feature = "processing")]
let (aggregator, _) = mock_service("aggregator", (), |&mut (), _| {});
let inner = InnerProcessor {
config: Arc::new(config),
envelope_manager,
project_cache,
outcome_aggregator,
#[cfg(feature = "processing")]
aggregator,
upstream_relay,
test_store,
#[cfg(feature = "processing")]
rate_limiter: None,
#[cfg(feature = "processing")]
redis_pool: None,
geoip_lookup: None,
global_config,
#[cfg(feature = "processing")]
aggregator,
};

EnvelopeProcessorService {
Expand Down
48 changes: 48 additions & 0 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,18 @@ pub struct ItemHeaders {
#[serde(default, skip)]
rate_limited: bool,

/// Contains the amount of events this item was generated and aggregated from.
///
/// A [metrics buckets](`ItemType::MetricBuckets`) item contains metrics extracted and
/// aggregated from (currently) transactions and profiles.
///
/// This information can not be directly inferred from the item itself anymore.
/// The amount of events this item/metric represents is instead stored here.
///
/// NOTE: This is internal-only and not exposed into the Envelope.
#[serde(default, skip)]
source_quantities: Option<SourceQuantities>,

/// A list of cumulative sample rates applied to this event.
///
/// Multiple entries in `sample_rates` mean that the event was sampled multiple times. The
Expand All @@ -501,6 +513,17 @@ pub struct ItemHeaders {
other: BTreeMap<String, Value>,
}

/// Container for item quantities that the item was derived from.
///
/// For example a metric bucket may be derived and aggregated from multiple transactions.
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub struct SourceQuantities {
/// Transaction quantity.
pub transactions: usize,
/// Profile quantity.
pub profiles: usize,
}

#[derive(Clone, Debug)]
pub struct Item {
headers: ItemHeaders,
Expand All @@ -519,6 +542,7 @@ impl Item {
filename: None,
routing_hint: None,
rate_limited: false,
source_quantities: None,
sample_rates: None,
other: BTreeMap::new(),
metrics_extracted: false,
Expand Down Expand Up @@ -666,6 +690,16 @@ impl Item {
self.headers.sample_rates.take()
}

/// Returns the contained source quantities.
pub fn source_quantities(&self) -> Option<SourceQuantities> {
self.headers.source_quantities
}

/// Sets new source quantities.
pub fn set_source_quantities(&mut self, source_quantities: SourceQuantities) {
self.headers.source_quantities = Some(source_quantities);
}

/// Sets sample rates for this item.
pub fn set_sample_rates(&mut self, sample_rates: Value) {
if matches!(sample_rates, Value::Array(ref a) if !a.is_empty()) {
Expand Down Expand Up @@ -1343,6 +1377,20 @@ mod tests {
assert_eq!(item.routing_hint(), Some(uuid));
}

#[test]
fn test_item_source_quantities() {
let mut item = Item::new(ItemType::MetricBuckets);
assert!(item.source_quantities().is_none());

let source_quantities = SourceQuantities {
transactions: 12,
..Default::default()
};
item.set_source_quantities(source_quantities);

assert_eq!(item.source_quantities(), Some(source_quantities));
}

#[test]
fn test_envelope_empty() {
let event_id = EventId::new();
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl ServiceState {
project_cache.clone(),
global_config.clone(),
upstream_relay.clone(),
test_store.clone(),
#[cfg(feature = "processing")]
aggregator.clone(),
)
Expand Down
40 changes: 28 additions & 12 deletions relay-server/src/utils/managed_envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,28 +111,30 @@ impl ManagedEnvelope {
}
}

/// Creates a standalone envelope for testing purposes.
///
/// As opposed to [`new`](Self::new), this does not require a queue permit. This makes it
/// suitable for unit testing internals of the processing pipeline.
#[cfg(test)]
pub fn standalone(
pub fn untracked(
envelope: Box<Envelope>,
outcome_aggregator: Addr<TrackOutcome>,
test_store: Addr<TestStore>,
) -> Self {
Self::new_internal(envelope, None, outcome_aggregator, test_store)
let mut envelope = Self::new_internal(envelope, None, outcome_aggregator, test_store);
envelope.context.done = true;
envelope
}

#[cfg(test)]
pub fn untracked(
/// Creates a new managed envelope like [`new`](Self::new) but without a queue permit.
///
/// This is suitable for aggregated metrics. Metrics live outside the lifecycle of a normal
/// event. They are extracted, aggregated and regularily flushed, after the
/// source event has already been processed.
///
/// The constructor is also suitable for unit testing internals of the processing pipeline.
pub fn standalone(
envelope: Box<Envelope>,
outcome_aggregator: Addr<TrackOutcome>,
test_store: Addr<TestStore>,
) -> Self {
let mut envelope = Self::new_internal(envelope, None, outcome_aggregator, test_store);
envelope.context.done = true;
envelope
Self::new_internal(envelope, None, outcome_aggregator, test_store)
}

/// Computes a managed envelope from the given envelope and binds it to the processing queue.
Expand Down Expand Up @@ -298,6 +300,7 @@ impl ManagedEnvelope {
tags.has_attachments = summary.attachment_quantity > 0,
tags.has_sessions = summary.session_quantity > 0,
tags.has_profiles = summary.profile_quantity > 0,
tags.has_transactions = summary.secondary_transaction_quantity > 0,
tags.has_replays = summary.replay_quantity > 0,
tags.has_checkins = summary.checkin_quantity > 0,
tags.event_category = ?summary.event_category,
Expand Down Expand Up @@ -326,7 +329,7 @@ impl ManagedEnvelope {

if self.context.summary.profile_quantity > 0 {
self.track_outcome(
outcome,
outcome.clone(),
if self.use_index_category() {
DataCategory::ProfileIndexed
} else {
Expand All @@ -336,6 +339,19 @@ impl ManagedEnvelope {
);
}

// Track outcomes for attached secondary transactions, e.g. extracted from metrics.
//
// Primary transaction count is already tracked through the event category
// (see: `Self::event_category()`).
if self.context.summary.secondary_transaction_quantity > 0 {
self.track_outcome(
outcome,
// Secondary transaction counts are never indexed transactions
DataCategory::Transaction,
self.context.summary.secondary_transaction_quantity,
);
}

self.finish(RelayCounters::EnvelopeRejected, handling);
}

Expand Down
49 changes: 48 additions & 1 deletion relay-server/src/utils/rate_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,16 @@ pub struct EnvelopeSummary {
/// The number of monitor check-ins.
pub checkin_quantity: usize,

/// Secondary number of of transactions.
///
/// This is 0 for envelopes which contain a transaction,
/// only secondary transaction quantity should be tracked here,
/// these are for example transaction counts extracted from metrics.
///
/// A "primary" transaction is contained within the envelope,
/// marking the envelope data category a [`DataCategory::Transaction`].
pub secondary_transaction_quantity: usize,

/// Indicates that the envelope contains regular attachments that do not create event payloads.
pub has_plain_attachments: bool,

Expand Down Expand Up @@ -177,6 +187,11 @@ impl EnvelopeSummary {
continue;
}

if let Some(source_quantities) = item.source_quantities() {
summary.secondary_transaction_quantity += source_quantities.transactions;
summary.profile_quantity += source_quantities.profiles;
}

summary.payload_size += item.len();
summary.set_quantity(item);
}
Expand Down Expand Up @@ -652,7 +667,10 @@ mod tests {
use smallvec::smallvec;

use super::*;
use crate::envelope::{AttachmentType, ContentType};
use crate::{
envelope::{AttachmentType, ContentType, SourceQuantities},
extractors::RequestMeta,
};

#[test]
fn test_format_rate_limits() {
Expand Down Expand Up @@ -1237,4 +1255,33 @@ mod tests {
mock.assert_call(DataCategory::TransactionIndexed, Some(1));
mock.assert_call(DataCategory::Attachment, None);
}

#[test]
fn test_source_quantity_for_total_quantity() {
let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
.parse()
.unwrap();
let request_meta = RequestMeta::new(dsn);

let mut envelope = Envelope::from_request(None, request_meta);

let mut item = Item::new(ItemType::MetricBuckets);
item.set_source_quantities(SourceQuantities {
transactions: 5,
profiles: 2,
});
envelope.add_item(item);

let mut item = Item::new(ItemType::MetricBuckets);
item.set_source_quantities(SourceQuantities {
transactions: 2,
profiles: 0,
});
envelope.add_item(item);

let summary = EnvelopeSummary::compute(&envelope);

assert_eq!(summary.profile_quantity, 2);
assert_eq!(summary.secondary_transaction_quantity, 7);
}
}

0 comments on commit 43d469b

Please sign in to comment.