Skip to content

Commit

Permalink
Merge branch 'master' into ref/actix-aws
Browse files Browse the repository at this point in the history
* master:
  feat(metrics): Ignore transaction metrics allowlist  (#1484)
  ref(metrics): Remove long running futures (#1492)
  ref(metrics): Convert metrics tests to insta (#1500)
  doc(metrics): Add bucket width to examples (#1496)
  • Loading branch information
jan-auer committed Sep 27, 2022
2 parents 15c573e + be12cd4 commit 22d8e69
Show file tree
Hide file tree
Showing 11 changed files with 493 additions and 466 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- Generate a new profile ID when splitting a profile for multiple transactions. ([#1473](https://github.com/getsentry/relay/pull/1473))
- Pin Rust version to 1.63.0 in Dockerfile. ([#1482](https://github.com/getsentry/relay/pull/1482))
- Normalize measurement units in event payload. ([#1488](https://github.com/getsentry/relay/pull/1488))
- Remove long-running futures from metrics flush. ([#1492](https://github.com/getsentry/relay/pull/1492))

## 22.9.0

Expand Down
9 changes: 3 additions & 6 deletions relay-metrics/benches/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use actix::prelude::*;
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};

use relay_common::{ProjectKey, UnixTimestamp};
use relay_metrics::{Aggregator, AggregatorConfig};
use relay_metrics::{Bucket, FlushBuckets, Metric, MetricValue};
use relay_metrics::{Aggregator, AggregatorConfig, FlushBuckets, Metric, MetricValue};

#[derive(Clone, Default)]
struct TestReceiver;
Expand All @@ -17,11 +16,9 @@ impl Actor for TestReceiver {
}

impl Handler<FlushBuckets> for TestReceiver {
type Result = Result<(), Vec<Bucket>>;
type Result = ();

fn handle(&mut self, _msg: FlushBuckets, _ctx: &mut Self::Context) -> Self::Result {
Ok(())
}
fn handle(&mut self, _msg: FlushBuckets, _ctx: &mut Self::Context) -> Self::Result {}
}

/// Struct representing a testcase for which insert + flush are timed.
Expand Down
103 changes: 50 additions & 53 deletions relay-metrics/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ pub struct ParseBucketError(#[cause] serde_json::Error);
/// [
/// {
/// "timestamp": 1615889440,
/// "width": 10,
/// "name": "endpoint.response_time",
/// "type": "d",
/// "unit": "millisecond",
Expand All @@ -679,6 +680,7 @@ pub struct ParseBucketError(#[cause] serde_json::Error);
/// },
/// {
/// "timestamp": 1615889440,
/// "width": 10,
/// "name": "endpoint.hits",
/// "type": "c",
/// "value": 4,
Expand All @@ -688,6 +690,7 @@ pub struct ParseBucketError(#[cause] serde_json::Error);
/// },
/// {
/// "timestamp": 1615889440,
/// "width": 10,
/// "name": "endpoint.parallel_requests",
/// "type": "g",
/// "value": {
Expand All @@ -700,6 +703,7 @@ pub struct ParseBucketError(#[cause] serde_json::Error);
/// },
/// {
/// "timestamp": 1615889440,
/// "width": 10,
/// "name": "endpoint.users",
/// "type": "s",
/// "value": [
Expand Down Expand Up @@ -1164,7 +1168,7 @@ pub struct FlushBuckets {
}

impl Message for FlushBuckets {
type Result = Result<(), Vec<Bucket>>;
type Result = ();
}

/// Check whether the aggregator has not (yet) exceeded its total limits. Used for health checks.
Expand Down Expand Up @@ -1379,27 +1383,7 @@ impl<T: Iterator<Item = Bucket>> FusedIterator for CappedBucketIter<T> {}
/// Internally, the aggregator maintains a continuous flush cycle every 100ms. It guarantees that
/// all elapsed buckets belonging to the same [`ProjectKey`] are flushed together.
///
/// Receivers must implement a handler for the [`FlushBuckets`] message:
///
/// ```
/// use actix::prelude::*;
/// use relay_metrics::{Bucket, FlushBuckets};
///
/// struct BucketReceiver;
///
/// impl Actor for BucketReceiver {
/// type Context = Context<Self>;
/// }
///
/// impl Handler<FlushBuckets> for BucketReceiver {
/// type Result = Result<(), Vec<Bucket>>;
///
/// fn handle(&mut self, msg: FlushBuckets, _ctx: &mut Self::Context) -> Self::Result {
/// // Return `Ok` to consume the buckets or `Err` to send them back
/// Err(msg.buckets)
/// }
/// }
/// ```
/// Receivers must implement a handler for the [`FlushBuckets`] message.
pub struct Aggregator {
config: AggregatorConfig,
buckets: HashMap<BucketKey, QueuedBucket>,
Expand Down Expand Up @@ -1778,11 +1762,12 @@ impl Aggregator {
);
}

/// Sends the [`FlushBuckets`] message to the receiver.
/// Sends the [`FlushBuckets`] message to the receiver in the fire and forget fashion. It is up
/// to the receiver to send the [`MergeBuckets`] message back if buckets could not be flushed
/// and we require another re-try.
///
/// If the receiver returns buckets, they are merged back into the cache.
/// If `force` is true, flush all buckets unconditionally and do not attempt to merge back.
fn try_flush(&mut self, mut context: Option<&mut <Self as Actor>::Context>) {
fn try_flush(&mut self) {
let flush_buckets = self.pop_flush_buckets();

if flush_buckets.is_empty() {
Expand All @@ -1803,28 +1788,19 @@ impl Aggregator {
let partitioned_buckets = self.partition_buckets(project_buckets, num_partitions);
for (partition_key, buckets) in partitioned_buckets {
self.process_batches(buckets, |batch| {
let fut = self
.receiver
.send(FlushBuckets {
project_key,
partition_key,
buckets: batch,
})
.into_actor(self)
.and_then(move |result, slf, _ctx| {
if let Err(buckets) = result {
relay_log::trace!(
"returned {} buckets from receiver, merging back",
buckets.len()
);
slf.merge_all(project_key, buckets).ok();
}
fut::ok(())
})
.drop_err();

if let Some(context) = context.as_deref_mut() {
fut.spawn(context);
let batch_size = batch.len() as u64;
let result = self.receiver.do_send(FlushBuckets {
project_key,
partition_key,
buckets: batch,
});
if let Err(err) = result {
// remove the failed batch size from the total count, since it failed and
// will be dropped
total_bucket_count -= batch_size;
relay_log::error!(
"Failed to flush the buckets, dropping {batch_size} buckets: {err}."
);
}
});
}
Expand Down Expand Up @@ -1854,8 +1830,8 @@ impl Actor for Aggregator {
Controller::subscribe(ctx.address());

// TODO: Consider a better approach than busy polling
ctx.run_interval(FLUSH_INTERVAL, |slf, context| {
slf.try_flush(Some(context));
ctx.run_interval(FLUSH_INTERVAL, |slf, _context| {
slf.try_flush();
});
}

Expand Down Expand Up @@ -1916,6 +1892,17 @@ impl InsertMetrics {
metrics: metrics.into_iter().collect(),
}
}

/// Returns the `ProjectKey` for the the current `InsertMetrics` message.
pub fn project_key(&self) -> ProjectKey {
self.project_key
}

/// Returns the list of the metrics in the current `InsertMetrics` message, consuming the
/// message itself.
pub fn metrics(self) -> Vec<Metric> {
self.metrics
}
}

impl Message for InsertMetrics {
Expand Down Expand Up @@ -1949,6 +1936,17 @@ impl MergeBuckets {
buckets,
}
}

/// Returns the `ProjectKey` for the the current `MergeBuckets` message.
pub fn project_key(&self) -> ProjectKey {
self.project_key
}

/// Returns the list of the buckets in the current `MergeBuckets` message, consuming the
/// message itself.
pub fn buckets(self) -> Vec<Bucket> {
self.buckets
}
}

impl Message for MergeBuckets {
Expand Down Expand Up @@ -2012,16 +2010,15 @@ mod tests {
}

impl Handler<FlushBuckets> for TestReceiver {
type Result = Result<(), Vec<Bucket>>;
type Result = ();

fn handle(&mut self, msg: FlushBuckets, _ctx: &mut Self::Context) -> Self::Result {
let buckets = msg.buckets;
relay_log::debug!("received buckets: {:#?}", buckets);
if self.reject_all {
return Err(buckets);
return;
}
self.add_buckets(buckets);
Ok(())
}
}

Expand Down Expand Up @@ -3063,7 +3060,7 @@ mod tests {
let captures = relay_statsd::with_capturing_test_client(|| {
aggregator.insert(project_key, metric1).ok();
aggregator.insert(project_key, metric2).ok();
aggregator.try_flush(None);
aggregator.try_flush();
});

captures
Expand Down
2 changes: 2 additions & 0 deletions relay-metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
//! "value": [36, 49, 57, 68],
//! "type": "d",
//! "timestamp": 1615889440,
//! "width": 10,
//! "tags": {
//! "route": "user_index"
//! }
Expand All @@ -64,6 +65,7 @@
//! "value": 4,
//! "type": "c",
//! "timestamp": 1615889440,
//! "width": 10,
//! "tags": {
//! "route": "user_index"
//! }
Expand Down
29 changes: 17 additions & 12 deletions relay-server/src/actors/envelopes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ use relay_common::ProjectKey;
use relay_config::{Config, HttpEncoding};
use relay_general::protocol::ClientReport;
use relay_log::LogError;
use relay_metrics::Bucket;
use relay_metrics::{Aggregator, Bucket, MergeBuckets};
use relay_quotas::Scoping;
use relay_statsd::metric;
use relay_system::{Addr, AsyncResponse, FromMessage, NoResponse, Sender};
use relay_system::{Addr, FromMessage, NoResponse};

use crate::actors::outcome::{DiscardReason, Outcome};
use crate::actors::processor::{EncodeEnvelope, EnvelopeProcessor};
Expand Down Expand Up @@ -176,7 +176,7 @@ pub struct SendMetrics {
pub enum EnvelopeManager {
SubmitEnvelope(Box<SubmitEnvelope>),
SendClientReports(SendClientReports),
SendMetrics(SendMetrics, Sender<Result<(), Vec<Bucket>>>),
SendMetrics(SendMetrics),
}

impl EnvelopeManager {
Expand Down Expand Up @@ -204,10 +204,10 @@ impl FromMessage<SendClientReports> for EnvelopeManager {
}

impl FromMessage<SendMetrics> for EnvelopeManager {
type Response = AsyncResponse<Result<(), Vec<Bucket>>>;
type Response = NoResponse;

fn from_message(message: SendMetrics, sender: Sender<Result<(), Vec<Bucket>>>) -> Self {
Self::SendMetrics(message, sender)
fn from_message(message: SendMetrics, _: ()) -> Self {
Self::SendMetrics(message)
}
}

Expand Down Expand Up @@ -338,7 +338,7 @@ impl EnvelopeManagerService {
}
}

async fn handle_send_metrics(&self, message: SendMetrics) -> Result<(), Vec<Bucket>> {
async fn handle_send_metrics(&self, message: SendMetrics) {
let SendMetrics {
buckets,
scoping,
Expand All @@ -361,9 +361,14 @@ impl EnvelopeManagerService {
envelope.add_item(item);

let partition_key = partition_key.map(|x| x.to_string());
self.submit_envelope(envelope, scoping, partition_key)
.await
.map_err(|_| buckets)
let result = self.submit_envelope(envelope, scoping, partition_key).await;
if let Err(err) = result {
relay_log::trace!(
"failed to submit the envelope, merging buckets back: {}",
err
);
Aggregator::from_registry().do_send(MergeBuckets::new(scoping.project_key, buckets))
}
}

async fn handle_send_client_reports(&self, message: SendClientReports) {
Expand Down Expand Up @@ -402,8 +407,8 @@ impl EnvelopeManagerService {
EnvelopeManager::SendClientReports(message) => {
self.handle_send_client_reports(message).await;
}
EnvelopeManager::SendMetrics(message, sender) => {
sender.send(self.handle_send_metrics(message).await);
EnvelopeManager::SendMetrics(message) => {
self.handle_send_metrics(message).await;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use relay_general::protocol::{
use relay_general::store::{ClockDriftProcessor, LightNormalizationConfig};
use relay_general::types::{Annotated, Array, FromValue, Object, ProcessingAction, Value};
use relay_log::LogError;
use relay_metrics::{Bucket, Metric};
use relay_metrics::{Bucket, InsertMetrics, MergeBuckets, Metric};
use relay_quotas::{DataCategory, RateLimits, ReasonCode};
use relay_redis::RedisPool;
use relay_sampling::{DynamicSamplingContext, RuleId};
Expand All @@ -41,7 +41,7 @@ use relay_system::{Addr, FromMessage, NoResponse, Service};
use crate::actors::envelopes::{EnvelopeManager, SendEnvelope, SendEnvelopeError, SubmitEnvelope};
use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::actors::project::{Feature, ProjectState};
use crate::actors::project_cache::{InsertMetrics, MergeBuckets, ProjectCache};
use crate::actors::project_cache::ProjectCache;
use crate::actors::upstream::{SendRequest, UpstreamRelay};
use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType};
use crate::metrics_extraction::sessions::{extract_session_metrics, SessionMetricsConfig};
Expand Down
8 changes: 3 additions & 5 deletions relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use relay_filter::{matches_any_origin, FiltersConfig};
use relay_general::pii::{DataScrubbingConfig, PiiConfig};
use relay_general::store::{BreakdownsConfig, MeasurementsConfig};
use relay_general::types::SpanAttribute;
use relay_metrics::{self, Aggregator, Bucket, Metric};
use relay_metrics::{self, Aggregator, Bucket, InsertMetrics, MergeBuckets, Metric};
use relay_quotas::{Quota, RateLimits, Scoping};
use relay_sampling::SamplingConfig;
use relay_statsd::metric;
Expand Down Expand Up @@ -601,8 +601,7 @@ impl Project {
/// The buckets will be keyed underneath this project key.
pub fn merge_buckets(&mut self, buckets: Vec<Bucket>) {
if self.metrics_allowed() {
Aggregator::from_registry()
.do_send(relay_metrics::MergeBuckets::new(self.project_key, buckets));
Aggregator::from_registry().do_send(MergeBuckets::new(self.project_key, buckets));
}
}

Expand All @@ -611,8 +610,7 @@ impl Project {
/// The metrics will be keyed underneath this project key.
pub fn insert_metrics(&mut self, metrics: Vec<Metric>) {
if self.metrics_allowed() {
Aggregator::from_registry()
.do_send(relay_metrics::InsertMetrics::new(self.project_key, metrics));
Aggregator::from_registry().do_send(InsertMetrics::new(self.project_key, metrics));
}
}

Expand Down
Loading

0 comments on commit 22d8e69

Please sign in to comment.