Skip to content

Commit

Permalink
ref: Key by scoping instead
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-auer committed Dec 22, 2023
1 parent 2ad53e9 commit 710f3f7
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 18 deletions.
16 changes: 6 additions & 10 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,21 +441,19 @@ impl EncodeEnvelope {
}
}

/// Metric buckets with additional project and scoping information.
/// Metric buckets with additional project.
#[derive(Debug)]
pub struct ProjectMetrics {
/// The metric buckets to encode.
pub buckets: Vec<Bucket>,
/// Scoping for metric buckets.
pub scoping: Scoping,
/// Project state for extracting quotas.
pub project_state: Arc<ProjectState>,
}

/// Encodes metrics into an envelope ready to be sent upstream.
#[derive(Debug)]
pub struct EncodeMetrics {
pub buckets: HashMap<ProjectKey, ProjectMetrics>,
pub scopes: HashMap<Scoping, ProjectMetrics>,
}

/// Encodes metric meta into an envelope and sends it upstream.
Expand Down Expand Up @@ -1387,7 +1385,7 @@ impl EnvelopeProcessorService {
match rate_limiter.is_rate_limited(quotas, item_scoping, quantities.buckets, false) {
Ok(limits) if limits.is_limited() => {
relay_log::debug!(
"dropping {} buckets due to throughput ratelimit",
"dropping {} buckets due to throughput rate limit",
quantities.buckets
);

Expand Down Expand Up @@ -1428,10 +1426,9 @@ impl EnvelopeProcessorService {
use crate::actors::store::StoreMetrics;
use crate::constants::DEFAULT_EVENT_RETENTION;

for message in message.buckets.into_values() {
for (scoping, message) in message.scopes {
let ProjectMetrics {
mut buckets,
scoping,
project_state,
} = message;

Expand Down Expand Up @@ -1492,10 +1489,9 @@ impl EnvelopeProcessorService {
let batch_size = self.inner.config.metrics_max_batch_size_bytes();
let upstream = self.inner.config.upstream_descriptor();

for (project_key, message) in message.buckets {
for (scoping, message) in message.scopes {
let ProjectMetrics {
buckets,
scoping,
project_state,
} = message;

Expand All @@ -1505,7 +1501,7 @@ impl EnvelopeProcessorService {
};

let dsn = PartialDsn::outbound(&scoping, upstream);
let partitions = partition_buckets(project_key, buckets, partition_count);
let partitions = partition_buckets(scoping.project_key, buckets, partition_count);

for (partition_key, buckets) in partitions {
let mut num_batches = 0;
Expand Down
11 changes: 6 additions & 5 deletions relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,7 @@ impl Project {
&mut self,
outcome_aggregator: Addr<TrackOutcome>,
buckets: Vec<Bucket>,
) -> Option<ProjectMetrics> {
) -> Option<(Scoping, ProjectMetrics)> {
let len = buckets.len();
let Some(project_state) = self.valid_state() else {
relay_log::trace!("there is no project state: dropping {len} buckets",);
Expand All @@ -1130,7 +1130,7 @@ impl Project {
.check_with_quotas(project_state.get_quotas(), item_scoping);

if limits.is_limited() {
relay_log::info!("dropping {len} buckets due to rate limit");
relay_log::debug!("dropping {len} buckets due to rate limit");

let mode = match project_state.config.transaction_metrics {
Some(ErrorBoundary::Ok(ref c)) if c.usage_metric() => ExtractionMode::Usage,
Expand All @@ -1148,11 +1148,12 @@ impl Project {
return None;
}

Some(ProjectMetrics {
let project_metrics = ProjectMetrics {
buckets,
scoping,
project_state,
})
};

Some((scoping, project_metrics))
}
}

Expand Down
6 changes: 3 additions & 3 deletions relay-server/src/actors/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,14 +887,14 @@ impl ProjectCacheBroker {
for (project_key, buckets) in message.buckets {
let outcome_aggregator = self.services.outcome_aggregator.clone();
let project = self.get_or_create_project(project_key);
if let Some(b) = project.check_buckets(outcome_aggregator, buckets) {
output.insert(project_key, b);
if let Some((scoping, b)) = project.check_buckets(outcome_aggregator, buckets) {
output.insert(scoping, b);
}
}

self.services
.envelope_processor
.send(EncodeMetrics { buckets: output })
.send(EncodeMetrics { scopes: output })
}

fn handle_buffer_index(&mut self, message: UpdateBufferIndex) {
Expand Down

0 comments on commit 710f3f7

Please sign in to comment.