Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(metrics): Fetch project states during aggregator shutdown [INGEST-784 INGEST-997] #1205

Merged
merged 5 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

- Tag transaction metrics by user satisfaction. ([#1197](https://github.com/getsentry/relay/pull/1197))

**Bug Fixes**:

- Prevent dropping metrics during Relay shutdown if the project is outdated or not cached at time of the shutdown. ([#1205](https://github.com/getsentry/relay/pull/1205))

**Internal**:

- Spread out metric aggregation over the aggregation window to avoid concentrated waves of metrics requests to the upstream every 10 seconds. Relay now applies jitter to `initial_delay` to spread out requests more evenly over time. ([#1185](https://github.com/getsentry/relay/pull/1185))
Expand Down
64 changes: 36 additions & 28 deletions relay-metrics/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,11 @@ impl Message for FlushBuckets {
type Result = Result<(), Vec<Bucket>>;
}

enum AggregatorState {
Running,
ShuttingDown,
}

/// A collector of [`Metric`] submissions.
///
/// # Aggregation
Expand Down Expand Up @@ -1022,6 +1027,7 @@ pub struct Aggregator {
config: AggregatorConfig,
buckets: HashMap<BucketKey, QueuedBucket>,
receiver: Recipient<FlushBuckets>,
state: AggregatorState,
}

impl Aggregator {
Expand All @@ -1034,6 +1040,7 @@ impl Aggregator {
config,
buckets: HashMap::new(),
receiver,
state: AggregatorState::Running,
}
}

Expand Down Expand Up @@ -1142,11 +1149,13 @@ impl Aggregator {
/// Pop and return the buckets that are eligible for flushing out according to bucket interval.
///
/// Note that this function is primarily intended for tests.
pub fn pop_flush_buckets(&mut self, force: bool) -> HashMap<ProjectKey, Vec<Bucket>> {
pub fn pop_flush_buckets(&mut self) -> HashMap<ProjectKey, Vec<Bucket>> {
relay_statsd::metric!(gauge(MetricGauges::Buckets) = self.buckets.len() as u64);

let mut buckets = HashMap::<ProjectKey, Vec<Bucket>>::new();

let force = matches!(&self.state, AggregatorState::ShuttingDown);

relay_statsd::metric!(timer(MetricTimers::BucketsScanDuration), {
let bucket_interval = self.config.bucket_interval;
self.buckets.retain(|key, entry| {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jan-auer I did not refactor this into a force and non-force branch after all. Since we need to iterate through self.buckets in order to construct the return value, I did not see much benefit.

Expand All @@ -1169,8 +1178,8 @@ impl Aggregator {
///
/// If the receiver returns buckets, they are merged back into the cache.
/// If `force` is true, flush all buckets unconditionally and do not attempt to merge back.
fn try_flush(&mut self, context: &mut <Self as Actor>::Context, force: bool) {
let flush_buckets = self.pop_flush_buckets(force);
fn try_flush(&mut self, context: &mut <Self as Actor>::Context) {
let flush_buckets = self.pop_flush_buckets();

if flush_buckets.is_empty() {
return;
Expand All @@ -1179,7 +1188,6 @@ impl Aggregator {
relay_log::trace!("flushing {} projects to receiver", flush_buckets.len());

let mut total_bucket_count = 0u64;
let merge_back = !force;
for (project_key, project_buckets) in flush_buckets.into_iter() {
let bucket_count = project_buckets.len() as u64;
relay_statsd::metric!(
Expand All @@ -1197,22 +1205,11 @@ impl Aggregator {
.into_actor(self)
.and_then(move |result, slf, _ctx| {
if let Err(buckets) = result {
if merge_back {
relay_log::trace!(
"returned {} buckets from receiver, merging back",
buckets.len()
);
slf.merge_all(project_key, buckets).ok();
} else {
// NOTE: This means we drop buckets if the project state is expired.
relay_log::error!(
"returned {} buckets from receiver, dropping",
buckets.len()
);
relay_statsd::metric!(
counter(MetricCounters::BucketsDropped) += buckets.len() as i64
);
}
relay_log::trace!(
"returned {} buckets from receiver, merging back",
buckets.len()
);
slf.merge_all(project_key, buckets).ok();
} else {
for (bucket_type, bucket_relative_size) in size_statsd_metrics {
relay_statsd::metric!(
Expand Down Expand Up @@ -1253,7 +1250,7 @@ impl Actor for Aggregator {

// TODO: Consider a better approach than busy polling
ctx.run_interval(FLUSH_INTERVAL, |slf, context| {
slf.try_flush(context, false);
slf.try_flush(context);
});
}

Expand All @@ -1275,16 +1272,27 @@ impl SystemService for Aggregator {}
impl Handler<Shutdown> for Aggregator {
type Result = Result<(), ()>;

fn handle(&mut self, message: Shutdown, context: &mut Self::Context) -> Self::Result {
fn handle(&mut self, message: Shutdown, _context: &mut Self::Context) -> Self::Result {
if message.timeout.is_some() {
relay_log::trace!("Shutting down...");
// is graceful shutdown
self.try_flush(context, true);
self.state = AggregatorState::ShuttingDown;
}
Ok(())
}
}

impl Drop for Aggregator {
fn drop(&mut self) {
let remaining_buckets = self.buckets.len();
if remaining_buckets > 0 {
relay_log::error!("Metrics aggregator dropping {} buckets", remaining_buckets);
relay_statsd::metric!(
counter(MetricCounters::BucketsDropped) += remaining_buckets as i64
);
}
}
}

/// A message containing a list of [`Metric`]s to be inserted into the aggregator.
#[derive(Debug)]
pub struct InsertMetrics {
Expand Down Expand Up @@ -1742,8 +1750,8 @@ mod tests {

let buckets: Vec<_> = aggregator
.buckets
.into_iter()
.map(|(k, e)| (k, e.value)) // skip flush times, they are different every time
.iter()
.map(|(k, e)| (k, &e.value)) // skip flush times, they are different every time
.collect();

insta::assert_debug_snapshot!(buckets, @r###"
Expand Down Expand Up @@ -1790,8 +1798,8 @@ mod tests {

let mut buckets: Vec<_> = aggregator
.buckets
.into_iter()
.map(|(k, e)| (k, e.value)) // skip flush times, they are different every time
.iter()
.map(|(k, e)| (k, &e.value)) // skip flush times, they are different every time
.collect();

buckets.sort_by(|a, b| a.0.timestamp.cmp(&b.0.timestamp));
Expand Down