Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: Use global metric submission #2882

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,8 @@ struct Http {
/// - `gzip` (default): Compression using gzip.
/// - `br`: Compression using the brotli algorithm.
encoding: HttpEncoding,
/// TODO(ja): Document
batch_metrics: bool,
}

impl Default for Http {
Expand All @@ -771,6 +773,7 @@ impl Default for Http {
retry_delay: default_retry_delay(),
project_failure_interval: default_project_failure_interval(),
encoding: HttpEncoding::Gzip,
batch_metrics: false,
}
}
}
Expand Down Expand Up @@ -1722,6 +1725,11 @@ impl Config {
self.values.http.encoding
}

/// TODO(ja): Doc
pub fn http_batch_metrics(&self) -> bool {
self.values.http.batch_metrics
}

/// Returns whether this Relay should emit outcomes.
///
/// This is `true` either if `outcomes.emit_outcomes` is explicitly enabled, or if this Relay is
Expand Down
36 changes: 17 additions & 19 deletions relay-metrics/src/aggregatorservice.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::time::Duration;

use relay_base_schema::project::ProjectKey;
Expand Down Expand Up @@ -206,10 +207,8 @@ pub struct BucketCountInquiry;
/// failed buckets. They will be merged back into the aggregator and flushed at a later time.
#[derive(Clone, Debug)]
pub struct FlushBuckets {
/// The project key.
pub project_key: ProjectKey,
/// The buckets to be flushed.
pub buckets: Vec<Bucket>,
pub buckets: HashMap<ProjectKey, Vec<Bucket>>,
}

enum AggregatorState {
Expand Down Expand Up @@ -265,37 +264,35 @@ impl AggregatorService {
///
/// If `force` is true, flush all buckets unconditionally and do not attempt to merge back.
fn try_flush(&mut self) {
let flush_buckets = {
let force_flush = matches!(&self.state, AggregatorState::ShuttingDown);
self.aggregator.pop_flush_buckets(force_flush)
};
let force_flush = matches!(&self.state, AggregatorState::ShuttingDown);
let buckets = self.aggregator.pop_flush_buckets(force_flush);

if flush_buckets.is_empty() {
if buckets.is_empty() {
return;
}

relay_log::trace!("flushing {} projects to receiver", flush_buckets.len());
relay_log::trace!("flushing {} projects to receiver", buckets.len());

// TODO(ja): Check if we want to log this somewhere else
let mut total_bucket_count = 0u64;
for (project_key, buckets) in flush_buckets.into_iter() {
for buckets in buckets.values() {
let bucket_count = buckets.len() as u64;
total_bucket_count += bucket_count;

relay_statsd::metric!(
histogram(MetricHistograms::BucketsFlushedPerProject) = bucket_count,
aggregator = self.aggregator.name(),
);
total_bucket_count += bucket_count;

if let Some(ref receiver) = self.receiver {
receiver.send(FlushBuckets {
project_key,
buckets,
});
}
}

relay_statsd::metric!(
histogram(MetricHistograms::BucketsFlushed) = total_bucket_count,
aggregator = self.aggregator.name(),
);

if let Some(ref receiver) = self.receiver {
receiver.send(FlushBuckets { buckets })
}
}

fn handle_merge_buckets(&mut self, msg: MergeBuckets) {
Expand Down Expand Up @@ -431,7 +428,8 @@ mod tests {
}

impl TestReceiver {
fn add_buckets(&self, buckets: Vec<Bucket>) {
fn add_buckets(&self, buckets: HashMap<ProjectKey, Vec<Bucket>>) {
let buckets = buckets.into_values().flatten();
self.data.write().unwrap().buckets.extend(buckets);
}

Expand Down
Loading
Loading