Skip to content

Commit

Permalink
produce outcomes for metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Nov 17, 2023
1 parent 49b6d7d commit da0ebea
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 76 deletions.
11 changes: 9 additions & 2 deletions relay-server/src/actors/envelopes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemType
use crate::extractors::{PartialDsn, RequestMeta};
use crate::http::{HttpError, Request, RequestBuilder, Response};
use crate::statsd::RelayHistograms;
use crate::utils::ManagedEnvelope;
use crate::utils::{ExtractionMode, ManagedEnvelope};

use super::processor::EncodeMetrics;

Expand Down Expand Up @@ -149,6 +149,8 @@ pub struct SendMetrics {
pub buckets: Vec<Bucket>,
/// Scoping information for the metrics.
pub scoping: Scoping,
/// Transaction extraction mode.
pub extraction_mode: ExtractionMode,
}

/// Dispatch service for generating and submitting Envelopes.
Expand Down Expand Up @@ -329,7 +331,11 @@ impl EnvelopeManagerService {
}

async fn handle_send_metrics(&self, message: SendMetrics) {
let SendMetrics { buckets, scoping } = message;
let SendMetrics {
buckets,
scoping,
extraction_mode,
} = message;

#[allow(unused_mut)]
let mut partitions = self.config.metrics_partitions();
Expand All @@ -347,6 +353,7 @@ impl EnvelopeManagerService {
self.enveloper_processor.send(EncodeMetrics {
buckets,
scoping,
extraction_mode,
max_batch_size_bytes,
partitions,
});
Expand Down
52 changes: 43 additions & 9 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,19 @@ use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::actors::project::ProjectState;
use crate::actors::project_cache::ProjectCache;
use crate::actors::upstream::{SendRequest, UpstreamRelay};
use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType};
use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType, SourceQuantities};
use crate::extractors::{PartialDsn, RequestMeta};
use crate::metrics_extraction::transactions::types::ExtractMetricsError;
use crate::metrics_extraction::transactions::{ExtractedMetrics, TransactionExtractor};
use crate::service::ServiceError;
use crate::statsd::{PlatformTag, RelayCounters, RelayHistograms, RelayTimers};
use crate::utils::{
self, ChunkedFormDataAggregator, FormDataIter, ItemAction, ManagedEnvelope, SamplingResult,
self, extract_transaction_count, ChunkedFormDataAggregator, ExtractionMode, FormDataIter,
ItemAction, ManagedEnvelope, SamplingResult,
};

use super::test_store::TestStore;

/// The minimum clock drift for correction to apply.
const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);

Expand Down Expand Up @@ -490,6 +493,8 @@ pub struct EncodeMetrics {
pub buckets: Vec<Bucket>,
/// Scoping for metric buckets.
pub scoping: Scoping,
/// Transaction metrics extraction mode.
pub extraction_mode: ExtractionMode,
/// Approximate size in bytes to batch buckets.
pub max_batch_size_bytes: usize,
/// Amount of logical partitions for the buckets.
Expand Down Expand Up @@ -577,6 +582,7 @@ struct InnerProcessor {
#[cfg(feature = "processing")]
aggregator: Addr<Aggregator>,
upstream_relay: Addr<UpstreamRelay>,
test_store: Addr<TestStore>,
#[cfg(feature = "processing")]
rate_limiter: Option<RedisRateLimiter>,
geoip_lookup: Option<GeoIpLookup>,
Expand All @@ -593,6 +599,7 @@ impl EnvelopeProcessorService {
project_cache: Addr<ProjectCache>,
global_config: Addr<GlobalConfigManager>,
upstream_relay: Addr<UpstreamRelay>,
test_store: Addr<TestStore>,
#[cfg(feature = "processing")] aggregator: Addr<Aggregator>,
) -> Self {
let geoip_lookup = config.geoip_path().and_then(|p| {
Expand All @@ -617,6 +624,7 @@ impl EnvelopeProcessorService {
global_config,
outcome_aggregator,
upstream_relay,
test_store,
geoip_lookup,
#[cfg(feature = "processing")]
aggregator,
Expand Down Expand Up @@ -3082,6 +3090,7 @@ impl EnvelopeProcessorService {
buckets,
scoping,
max_batch_size_bytes,
extraction_mode,
partitions,
} = message;

Expand All @@ -3100,11 +3109,14 @@ impl EnvelopeProcessorService {
let mut num_batches = 0;

for batch in BucketsView::new(&buckets).by_size(max_batch_size_bytes) {
#[allow(clippy::redundant_closure_call)]
let mut envelope: ManagedEnvelope = (move || {
let _ = dsn;
todo!("Added by next commit in the PR")
})();
let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
envelope.add_item(create_metrics_item(&batch, extraction_mode));

let mut envelope = ManagedEnvelope::standalone(
envelope,
self.inner.outcome_aggregator.clone(),
self.inner.test_store.clone(),
);
envelope.set_partition_key(partition_key).scope(scoping);

relay_statsd::metric!(
Expand Down Expand Up @@ -3186,6 +3198,26 @@ impl Service for EnvelopeProcessorService {
}
}

fn create_metrics_item(buckets: &BucketsView<'_>, extraction_mode: ExtractionMode) -> Item {
let source_quantities = buckets
.iter()
.filter_map(|bucket| extract_transaction_count(&bucket, extraction_mode))
.fold(SourceQuantities::default(), |acc, c| {
let profile_count = if c.has_profile { c.count } else { 0 };

SourceQuantities {
transactions: acc.transactions + c.count,
profiles: acc.profiles + profile_count,
}
});

let mut item = Item::new(ItemType::MetricBuckets);
item.set_source_quantities(source_quantities);
item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());

item
}

#[cfg(test)]
mod tests {
use std::env;
Expand Down Expand Up @@ -3621,22 +3653,24 @@ mod tests {
let (project_cache, _) = mock_service("project_cache", (), |&mut (), _| {});
let (upstream_relay, _) = mock_service("upstream_relay", (), |&mut (), _| {});
let (global_config, _) = mock_service("global_config", (), |&mut (), _| {});
let (test_store, _) = mock_service("test_store", (), |&mut (), _| {});
#[cfg(feature = "processing")]
let (aggregator, _) = mock_service("aggregator", (), |&mut (), _| {});
let inner = InnerProcessor {
config: Arc::new(config),
envelope_manager,
project_cache,
outcome_aggregator,
#[cfg(feature = "processing")]
aggregator,
upstream_relay,
test_store,
#[cfg(feature = "processing")]
rate_limiter: None,
#[cfg(feature = "processing")]
redis_pool: None,
geoip_lookup: None,
global_config,
#[cfg(feature = "processing")]
aggregator,
};

EnvelopeProcessorService {
Expand Down
37 changes: 32 additions & 5 deletions relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use crate::actors::project_cache::{CheckedEnvelope, ProjectCache, RequestUpdate}

use crate::extractors::RequestMeta;
use crate::statsd::RelayCounters;
use crate::utils::{EnvelopeLimiter, ManagedEnvelope, MetricsLimiter, RetryBackoff};
use crate::utils::{
EnvelopeLimiter, ExtractionMode, ManagedEnvelope, MetricsLimiter, RetryBackoff,
};

/// The expiry status of a project state. Return value of [`ProjectState::check_expiry`].
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -571,7 +573,8 @@ impl Project {
_ => false,
};

match MetricsLimiter::create(metrics, &state.config.quotas, scoping, usage) {
let mode = ExtractionMode::from_usage(usage);
match MetricsLimiter::create(metrics, &state.config.quotas, scoping, mode) {
Ok(mut limiter) => {
limiter.enforce_limits(Ok(&self.rate_limits), outcome_aggregator);
limiter.into_metrics()
Expand Down Expand Up @@ -644,8 +647,9 @@ impl Project {
Some(ErrorBoundary::Ok(ref c)) => c.usage_metric(),
_ => false,
};
let extraction_mode = ExtractionMode::from_usage(usage);

let buckets = match MetricsLimiter::create(buckets, quotas, scoping, usage) {
let buckets = match MetricsLimiter::create(buckets, quotas, scoping, extraction_mode) {
Ok(mut bucket_limiter) => {
let cached_rate_limits = self.rate_limits().clone();
#[allow(unused_variables)]
Expand Down Expand Up @@ -1023,14 +1027,37 @@ impl Project {
})
}

pub fn flush_buckets(&mut self, envelope_manager: Addr<EnvelopeManager>, buckets: Vec<Bucket>) {
pub fn flush_buckets(
&mut self,
project_cache: Addr<ProjectCache>,
envelope_manager: Addr<EnvelopeManager>,
buckets: Vec<Bucket>,
) {
let Some(project_state) = self.get_cached_state(project_cache, false) else {
relay_log::trace!(
"there is no project state: dropping {} buckets",
buckets.len()
);
return;
};

let Some(scoping) = self.scoping() else {
relay_log::trace!("there is no scoping: dropping {} buckets", buckets.len());
return;
};

let usage = match project_state.config.transaction_metrics {
Some(ErrorBoundary::Ok(ref c)) => c.usage_metric(),
_ => false,
};
let extraction_mode = ExtractionMode::from_usage(usage);

if !buckets.is_empty() {
envelope_manager.send(SendMetrics { buckets, scoping });
envelope_manager.send(SendMetrics {
buckets,
scoping,
extraction_mode,
});
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion relay-server/src/actors/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -784,9 +784,10 @@ impl ProjectCacheBroker {

fn handle_flush_buckets(&mut self, message: FlushBuckets) {
let envelope_manager = self.services.envelope_manager.clone();
let project_cache = self.services.project_cache.clone();

self.get_or_create_project(message.project_key)
.flush_buckets(envelope_manager, message.buckets);
.flush_buckets(project_cache, envelope_manager, message.buckets);
}

fn handle_buffer_index(&mut self, message: UpdateBufferIndex) {
Expand Down
48 changes: 48 additions & 0 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,18 @@ pub struct ItemHeaders {
#[serde(default, skip)]
rate_limited: bool,

/// Contains the amount of events this item was generated and aggregated from.
///
/// A [metrics buckets](`ItemType::MetricBuckets`) item contains metrics extracted and
/// aggregated from (currently) transactions and profiles.
///
/// This information can not be directly inferred from the item itself anymore.
/// The amount of events this item/metric represents is instead stored here.
///
/// NOTE: This is internal-only and not exposed into the Envelope.
#[serde(default, skip)]
source_quantities: Option<SourceQuantities>,

/// A list of cumulative sample rates applied to this event.
///
/// Multiple entries in `sample_rates` mean that the event was sampled multiple times. The
Expand All @@ -501,6 +513,17 @@ pub struct ItemHeaders {
other: BTreeMap<String, Value>,
}

/// Container for item quantities that the item was derived from.
///
/// For example a metric bucket may be derived and aggregated from multiple transactions.
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub struct SourceQuantities {
/// Transaction quantity.
pub transactions: usize,
/// Profile quantity.
pub profiles: usize,
}

#[derive(Clone, Debug)]
pub struct Item {
headers: ItemHeaders,
Expand All @@ -519,6 +542,7 @@ impl Item {
filename: None,
routing_hint: None,
rate_limited: false,
source_quantities: None,
sample_rates: None,
other: BTreeMap::new(),
metrics_extracted: false,
Expand Down Expand Up @@ -666,6 +690,16 @@ impl Item {
self.headers.sample_rates.take()
}

/// Returns the contained source quantities.
pub fn source_quantities(&self) -> Option<SourceQuantities> {
self.headers.source_quantities
}

/// Sets new source quantities.
pub fn set_source_quantities(&mut self, source_quantities: SourceQuantities) {
self.headers.source_quantities = Some(source_quantities);
}

/// Sets sample rates for this item.
pub fn set_sample_rates(&mut self, sample_rates: Value) {
if matches!(sample_rates, Value::Array(ref a) if !a.is_empty()) {
Expand Down Expand Up @@ -1343,6 +1377,20 @@ mod tests {
assert_eq!(item.routing_hint(), Some(uuid));
}

#[test]
fn test_item_source_quantities() {
let mut item = Item::new(ItemType::MetricBuckets);
assert!(item.source_quantities().is_none());

let source_quantities = SourceQuantities {
transactions: 12,
..Default::default()
};
item.set_source_quantities(source_quantities);

assert_eq!(item.source_quantities(), Some(source_quantities));
}

#[test]
fn test_envelope_empty() {
let event_id = EventId::new();
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl ServiceState {
project_cache.clone(),
global_config.clone(),
upstream_relay.clone(),
test_store.clone(),
#[cfg(feature = "processing")]
aggregator.clone(),
)
Expand Down
Loading

0 comments on commit da0ebea

Please sign in to comment.