From e2f04d66fab7122161636203342dc89c0199eb35 Mon Sep 17 00:00:00 2001 From: jjbayer Date: Tue, 30 Mar 2021 10:47:43 +0200 Subject: [PATCH] feat(server): Send pre-aggregated metrics in batches (#966) Uses the metrics aggregator from #958 to send batches of pre-aggregated buckets to the upstream instead of forwarding individual metric values. If sending fails for any reason, the metrics are merged back into the aggregator, which will retry flushing after the next interval. Metric envelopes are not queued like regular envelopes. Instead, they go straight to the EventProcessor worker pool, where they are parsed, normalized and sent to the project's aggregator. This ensures that metric requests do not create long running futures that would slow down the system. For mixed envelopes, metric items are split off and handled separately. Metrics aggregators are spawned on the projects thread, which runs the project cache and manages all project state access. In the future, metrics aggregation will have to be moved to a separate resource to ensure that project state requests remain instant. --- CHANGELOG.md | 2 +- Cargo.lock | 1 + relay-config/Cargo.toml | 1 + relay-config/src/config.rs | 8 + relay-metrics/src/aggregation.rs | 173 ++++++- relay-metrics/src/lib.rs | 33 ++ relay-server/build.rs | 6 + relay-server/src/actors/events.rs | 476 ++++++++++++++------ relay-server/src/actors/project.rs | 238 +++++++++- relay-server/src/actors/project_cache.rs | 13 +- relay-server/src/actors/store.rs | 37 +- relay-server/src/endpoints/common.rs | 3 +- relay-server/src/envelope.rs | 14 +- relay-server/src/extractors/request_meta.rs | 27 +- relay-server/src/service.rs | 9 +- relay-server/src/utils/rate_limits.rs | 1 + tests/integration/fixtures/processing.py | 8 +- tests/integration/test_metrics.py | 69 ++- 18 files changed, 898 insertions(+), 221 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c9ae45d7e7..0770564afd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ **Internal**: - Emit the `quantity` field for outcomes of events. This field describes the total size in bytes for attachments or the event count for all other categories. A separate outcome is emitted for attachments in a rejected envelope, if any, in addition to the event outcome. ([#942](https://github.com/getsentry/relay/pull/942)) -- Add experimental metrics ingestion with bucketing or pre-aggregation. ([#948](https://github.com/getsentry/relay/pull/948), [#952](https://github.com/getsentry/relay/pull/952), [#958](https://github.com/getsentry/relay/pull/958)) +- Add experimental metrics ingestion with bucketing and pre-aggregation. ([#948](https://github.com/getsentry/relay/pull/948), [#952](https://github.com/getsentry/relay/pull/952), [#958](https://github.com/getsentry/relay/pull/958), [#966](https://github.com/getsentry/relay/pull/966)) - Change HTTP response for upstream timeouts from 502 to 504. ([#859](https://github.com/getsentry/relay/pull/859)) - Add rule id to outcomes coming from transaction sampling. ([#953](https://github.com/getsentry/relay/pull/953)) - Add support for breakdowns ingestion. ([#934](https://github.com/getsentry/relay/pull/934)) diff --git a/Cargo.lock b/Cargo.lock index 80e37eb652..a9751a725f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3132,6 +3132,7 @@ dependencies = [ "relay-auth", "relay-common", "relay-log", + "relay-metrics", "relay-redis", "serde", "serde_json", diff --git a/relay-config/Cargo.toml b/relay-config/Cargo.toml index 93744bc3cc..5f036741a3 100644 --- a/relay-config/Cargo.toml +++ b/relay-config/Cargo.toml @@ -20,6 +20,7 @@ num_cpus = "1.13.0" relay-auth = { path = "../relay-auth" } relay-common = { path = "../relay-common" } relay-log = { path = "../relay-log", features = ["init"] } +relay-metrics = { path = "../relay-metrics" } relay-redis = { path = "../relay-redis" } serde = { version = "1.0.114", features = ["derive"] } serde_json = "1.0.55" diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 86d884ec67..8904476e9f 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -13,6 +13,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use relay_auth::{generate_key_pair, generate_relay_id, PublicKey, RelayId, SecretKey}; use relay_common::Uuid; +use relay_metrics::AggregatorConfig; use relay_redis::RedisConfig; use crate::byte_size::ByteSize; @@ -813,6 +814,8 @@ struct ConfigValues { processing: Processing, #[serde(default)] outcomes: Outcomes, + #[serde(default)] + aggregator: AggregatorConfig, } impl ConfigObject for ConfigValues { @@ -1421,6 +1424,11 @@ impl Config { pub fn max_rate_limit(&self) -> Option { self.values.processing.max_rate_limit.map(u32::into) } + + /// Returns configuration for the metrics [aggregator](relay_metrics::Aggregator). + pub fn aggregator_config(&self) -> AggregatorConfig { + self.values.aggregator.clone() + } } impl Default for Config { diff --git a/relay-metrics/src/aggregation.rs b/relay-metrics/src/aggregation.rs index fb9235f247..f0bc0eda03 100644 --- a/relay-metrics/src/aggregation.rs +++ b/relay-metrics/src/aggregation.rs @@ -13,7 +13,7 @@ use relay_common::{MonotonicResult, UnixTimestamp}; use crate::{Metric, MetricType, MetricValue}; /// The [aggregated value](Bucket::value) of a metric bucket. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(tag = "type", content = "value")] pub enum BucketValue { /// Aggregates [`MetricValue::Counter`] values by adding them into a single value. @@ -195,7 +195,7 @@ pub struct Bucket { /// A list of tags adding dimensions to the metric for filtering and aggregation. /// /// See [`Metric::tags`]. Every combination of tags results in a different bucket. - #[serde(skip_serializing_if = "BTreeMap::is_empty")] + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] pub tags: BTreeMap, } @@ -721,7 +721,170 @@ mod tests { } #[test] - fn test_merge_counters() { + fn test_parse_buckets() { + let json = r#"[ + { + "name": "endpoint.response_time", + "unit": "ms", + "value": [36, 49, 57, 68], + "type": "d", + "timestamp": 1615889440, + "tags": { + "route": "user_index" + } + } + ]"#; + + // TODO: This should parse the unit. + let buckets = Bucket::parse_all(json.as_bytes()).unwrap(); + insta::assert_debug_snapshot!(buckets, @r###" + [ + Bucket { + timestamp: UnixTimestamp(1615889440), + name: "endpoint.response_time", + value: Distribution( + [ + 36.0, + 49.0, + 57.0, + 68.0, + ], + ), + tags: { + "route": "user_index", + }, + }, + ] + "###); + } + + #[test] + fn test_parse_bucket_defaults() { + let json = r#"[ + { + "name": "endpoint.hits", + "value": 4, + "type": "c", + "timestamp": 1615889440 + } + ]"#; + + let buckets = Bucket::parse_all(json.as_bytes()).unwrap(); + insta::assert_debug_snapshot!(buckets, @r###" + [ + Bucket { + timestamp: UnixTimestamp(1615889440), + name: "endpoint.hits", + value: Counter( + 4.0, + ), + tags: {}, + }, + ] + "###); + } + + #[test] + fn test_buckets_roundtrip() { + let json = r#"[ + { + "timestamp": 1615889440, + "name": "endpoint.response_time", + "type": "d", + "value": [ + 36.0, + 49.0, + 57.0, + 68.0 + ], + "tags": { + "route": "user_index" + } + }, + { + "timestamp": 1615889440, + "name": "endpoint.hits", + "type": "c", + "value": 4.0, + "tags": { + "route": "user_index" + } + } +]"#; + + let buckets = Bucket::parse_all(json.as_bytes()).unwrap(); + let serialized = serde_json::to_string_pretty(&buckets).unwrap(); + assert_eq!(json, serialized); + } + + #[test] + fn test_bucket_value_merge_counter() { + let mut value = BucketValue::Counter(42.); + BucketValue::Counter(43.).merge_into(&mut value).unwrap(); + assert_eq!(value, BucketValue::Counter(85.)); + } + + #[test] + fn test_bucket_value_merge_distribution() { + let mut value = BucketValue::Distribution(vec![1., 2., 3.]); + BucketValue::Distribution(vec![2., 4.]) + .merge_into(&mut value) + .unwrap(); + // TODO: This should be ordered + assert_eq!(value, BucketValue::Distribution(vec![1., 2., 3., 2., 4.])); + } + + #[test] + fn test_bucket_value_merge_set() { + let mut value = BucketValue::Set(vec![1, 2].into_iter().collect()); + BucketValue::Set(vec![2, 3].into_iter().collect()) + .merge_into(&mut value) + .unwrap(); + assert_eq!(value, BucketValue::Set(vec![1, 2, 3].into_iter().collect())); + } + + #[test] + fn test_bucket_value_merge_gauge() { + let mut value = BucketValue::Gauge(42.); + BucketValue::Gauge(43.).merge_into(&mut value).unwrap(); + assert_eq!(value, BucketValue::Gauge(43.)); + } + + #[test] + fn test_bucket_value_insert_counter() { + let mut value = BucketValue::Counter(42.); + MetricValue::Counter(43.).merge_into(&mut value).unwrap(); + assert_eq!(value, BucketValue::Counter(85.)); + } + + #[test] + fn test_bucket_value_insert_distribution() { + let mut value = BucketValue::Distribution(vec![1., 2., 3.]); + MetricValue::Distribution(2.0) + .merge_into(&mut value) + .unwrap(); + // TODO: This should be ordered + assert_eq!(value, BucketValue::Distribution(vec![1., 2., 3., 2.])); + } + + #[test] + fn test_bucket_value_insert_set() { + let mut value = BucketValue::Set(vec![1, 2].into_iter().collect()); + MetricValue::Set(3).merge_into(&mut value).unwrap(); + assert_eq!(value, BucketValue::Set(vec![1, 2, 3].into_iter().collect())); + MetricValue::Set(2).merge_into(&mut value).unwrap(); + assert_eq!(value, BucketValue::Set(vec![1, 2, 3].into_iter().collect())); + } + + #[test] + fn test_bucket_value_insert_gauge() { + let mut value = BucketValue::Gauge(42.); + MetricValue::Gauge(43.).merge_into(&mut value).unwrap(); + assert_eq!(value, BucketValue::Gauge(43.)); + } + + #[test] + fn test_aggregator_merge_counters() { relay_test::setup(); let config = AggregatorConfig::default(); @@ -750,7 +913,7 @@ mod tests { } #[test] - fn test_merge_similar_timestamps() { + fn test_aggregator_merge_timestamps() { relay_test::setup(); let config = AggregatorConfig { bucket_interval: 10, @@ -801,7 +964,7 @@ mod tests { } #[test] - fn test_mixup_types() { + fn test_aggregator_mixup_types() { relay_test::setup(); let config = AggregatorConfig { bucket_interval: 10, diff --git a/relay-metrics/src/lib.rs b/relay-metrics/src/lib.rs index 2149926b17..4224781549 100644 --- a/relay-metrics/src/lib.rs +++ b/relay-metrics/src/lib.rs @@ -49,6 +49,39 @@ //! } //! ] //! ``` +//! +//! # Ingestion +//! +//! Processing Relays write aggregate buckets into the ingestion Kafka stream. The schema is similar +//! to the aggregation payload, with the addition of scoping information: +//! +//! ```json +//! [ +//! { +//! "org_id": 1, +//! "project_id": 42, +//! "name": "endpoint.response_time", +//! "unit": "ms", +//! "value": [36, 49, 57, 68], +//! "type": "d", +//! "timestamp": 1615889440, +//! "tags": { +//! "route": "user_index" +//! } +//! }, +//! { +//! "org_id": 1, +//! "project_id": 42, +//! "name": "endpoint.hits", +//! "value": 4, +//! "type": "c", +//! "timestamp": 1615889440, +//! "tags": { +//! "route": "user_index" +//! } +//! } +//! ] +//! ``` #![warn(missing_docs)] mod aggregation; diff --git a/relay-server/build.rs b/relay-server/build.rs index e700e6ad40..355ca05e32 100644 --- a/relay-server/build.rs +++ b/relay-server/build.rs @@ -14,6 +14,12 @@ fn main() { env::var("CARGO_PKG_VERSION").unwrap() ) .unwrap(); + writeln!( + f, + "pub const CLIENT: &str = \"sentry.relay/{}\";", + env::var("CARGO_PKG_VERSION").unwrap() + ) + .unwrap(); println!("cargo:rerun-if-changed=build.rs\n"); println!("cargo:rerun-if-changed=Cargo.toml\n"); } diff --git a/relay-server/src/actors/events.rs b/relay-server/src/actors/events.rs index c6fc64f94a..a5c39dcb90 100644 --- a/relay-server/src/actors/events.rs +++ b/relay-server/src/actors/events.rs @@ -1,5 +1,6 @@ use std::cell::RefCell; use std::collections::BTreeMap; +use std::fmt; use std::rc::Rc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -8,7 +9,7 @@ use std::time::{Duration, Instant}; use actix::prelude::*; use chrono::{DateTime, Duration as SignedDuration, Utc}; use failure::Fail; -use futures::prelude::*; +use futures::{future, prelude::*}; use serde_json::Value as SerdeValue; use relay_common::{clone, metric, ProjectId, UnixTimestamp}; @@ -22,8 +23,8 @@ use relay_general::protocol::{ use relay_general::store::ClockDriftProcessor; use relay_general::types::{Annotated, Array, FromValue, Object, ProcessingAction, Value}; use relay_log::LogError; -use relay_metrics::Metric; -use relay_quotas::{DataCategory, RateLimits}; +use relay_metrics::{Bucket, InsertMetrics, MergeBuckets, Metric}; +use relay_quotas::{DataCategory, RateLimits, Scoping}; use relay_redis::RedisPool; use relay_sampling::{RuleId, SamplingResult}; @@ -34,6 +35,7 @@ use crate::actors::project::{ use crate::actors::project_cache::ProjectError; use crate::actors::upstream::{SendRequest, UpstreamRelay, UpstreamRequestError}; use crate::envelope::{self, AttachmentType, ContentType, Envelope, Item, ItemType}; +use crate::extractors::{PartialDsn, RequestMeta}; use crate::http::{HttpError, RequestBuilder}; use crate::metrics::{RelayCounters, RelayHistograms, RelaySets, RelayTimers}; use crate::service::ServerError; @@ -309,47 +311,6 @@ impl EventProcessor { self } - /// Validates all metrics in the envelope, if any. - /// - /// Metrics are removed from the envelope if they contain invalid syntax or if their timestamps - /// are out of range after clock drift correction. - fn process_metrics(&self, state: &mut ProcessEnvelopeState) { - let envelope = &mut state.envelope; - let received = state.received_at; - // TODO(ja): Avoid unsafe cast - let timestamp = UnixTimestamp::from_secs(received.timestamp() as u64); - - let clock_drift_processor = - ClockDriftProcessor::new(envelope.sent_at(), received).at_least(MINIMUM_CLOCK_DRIFT); - - envelope.retain_items(|item| { - if item.ty() != ItemType::Metrics { - return true; - } - - let payload = item.payload(); - let mut normalized = Vec::with_capacity(payload.len()); - - for metric_result in Metric::parse_all(&payload, timestamp) { - let mut metric = match metric_result { - Ok(metric) => metric, - Err(_) => continue, - }; - - clock_drift_processor.process_timestamp(&mut metric.timestamp); - - if !normalized.is_empty() { - normalized.push(b'\n'); - } - normalized.extend(metric.serialize().as_bytes()); - } - - item.set_payload(ContentType::Text, normalized); - - true - }); - } - /// Validates all sessions in the envelope, if any. /// /// Sessions are removed from the envelope if they contain invalid JSON or if their timestamps @@ -775,6 +736,7 @@ impl EventProcessor { ItemType::Session => false, ItemType::Sessions => false, ItemType::Metrics => false, + ItemType::MetricBuckets => false, } } @@ -1065,7 +1027,7 @@ impl EventProcessor { // Fetch scoping again from the project state. This is a rather cheap operation at this // point and it is easier than passing scoping through all layers of `process_envelope`. - let scoping = project_state.get_scoping(state.envelope.meta()); + let scoping = project_state.scope_request(state.envelope.meta()); state.rate_limits = metric!(timer(RelayTimers::EventProcessingRateLimiting), { envelope_limiter @@ -1212,7 +1174,6 @@ impl EventProcessor { }; } - self.process_metrics(&mut state); self.process_sessions(&mut state); self.process_user_reports(&mut state); @@ -1319,6 +1280,93 @@ impl Handler for EventProcessor { } } +/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator. +/// +/// This parses and validates the metrics: +/// - For [`Metrics`](ItemType::Metrics), each metric is parsed separately, and invalid metrics are +/// ignored independently. +/// - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and +/// dropped together on parsing failure. +/// - Other items will be ignored with an error message. +/// +/// Additionally, processing applies clock drift correction using the system clock of this Relay, if +/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header. +struct ProcessMetrics { + /// A list of metric items. + pub items: Vec, + + /// The target project. + pub project: Addr, + + /// The instant at which the request was received. + pub start_time: Instant, + + /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift + /// correction. + pub sent_at: Option>, +} + +impl Message for ProcessMetrics { + type Result = (); +} + +impl Handler for EventProcessor { + type Result = (); + + fn handle(&mut self, message: ProcessMetrics, _context: &mut Self::Context) -> Self::Result { + let ProcessMetrics { + items, + project, + start_time, + sent_at, + } = message; + + let received = relay_common::instant_to_date_time(start_time); + let timestamp = UnixTimestamp::from_secs(received.timestamp() as u64); + + let clock_drift_processor = + ClockDriftProcessor::new(sent_at, received).at_least(MINIMUM_CLOCK_DRIFT); + + for item in items { + let payload = item.payload(); + if item.ty() == ItemType::Metrics { + let metrics = Metric::parse_all(&payload, timestamp).filter_map(|result| { + let mut metric = result.ok()?; + clock_drift_processor.process_timestamp(&mut metric.timestamp); + Some(metric) + }); + + relay_log::trace!("inserting metrics into project aggregator"); + project.do_send(InsertMetrics::new(metrics)); + } else if item.ty() == ItemType::MetricBuckets { + if let Ok(mut buckets) = Bucket::parse_all(&payload) { + for bucket in &mut buckets { + clock_drift_processor.process_timestamp(&mut bucket.timestamp); + } + + relay_log::trace!("merging metric buckets into project aggregator"); + project.do_send(MergeBuckets::new(buckets)); + } + } else { + relay_log::error!( + "invalid item of type {} passed to ProcessMetrics", + item.ty() + ); + } + } + } +} + +/// Error returned from [`EventManager::send_envelope`]. +#[derive(Debug)] +enum SendEnvelopeError { + ScheduleFailed(MailboxError), + #[cfg(feature = "processing")] + StoreFailed(StoreError), + SendFailed(UpstreamRequestError), + RateLimited(RateLimits), +} + /// Either a captured envelope or an error that occured during processing. pub type CapturedEnvelope = Result; @@ -1394,6 +1442,107 @@ impl EventManager { outcome_producer, }) } + + /// Sends an envelope to the upstream or Kafka and handles returned rate limits. + fn send_envelope( + &mut self, + project: Addr, + mut envelope: Envelope, + scoping: Scoping, + #[allow(unused_variables)] start_time: Instant, + ) -> ResponseFuture<(), SendEnvelopeError> { + #[cfg(feature = "processing")] + { + if let Some(ref store_forwarder) = self.store_forwarder { + relay_log::trace!("sending envelope to kafka"); + let future = store_forwarder + .send(StoreEnvelope { + envelope, + start_time, + scoping, + }) + .map_err(SendEnvelopeError::ScheduleFailed) + .and_then(|result| result.map_err(SendEnvelopeError::StoreFailed)); + + return Box::new(future); + } + } + + // if we are in capture mode, we stash away the event instead of + // forwarding it. + if self.config.relay_mode() == RelayMode::Capture { + // XXX: this is wrong because captured_events does not take envelopes without + // event_id into account. + if let Some(event_id) = envelope.event_id() { + relay_log::debug!("capturing envelope"); + self.captures.insert(event_id, Ok(envelope)); + } else { + relay_log::debug!("dropping non event envelope"); + } + + return Box::new(future::ok(())); + } + + relay_log::trace!("sending envelope to sentry endpoint"); + let http_encoding = self.config.http_encoding(); + let request = SendRequest::post(format!("/api/{}/envelope/", scoping.project_id)).build( + move |mut builder: RequestBuilder| { + // Override the `sent_at` timestamp. Since the event went through basic + // normalization, all timestamps have been corrected. We propagate the new + // `sent_at` to allow the next Relay to double-check this timestamp and + // potentially apply correction again. This is done as close to sending as + // possible so that we avoid internal delays. + envelope.set_sent_at(Utc::now()); + + let meta = envelope.meta(); + + if let Some(origin) = meta.origin() { + builder.header("Origin", origin.as_str()); + } + + if let Some(user_agent) = meta.user_agent() { + builder.header("User-Agent", user_agent); + } + + builder + .content_encoding(http_encoding) + .header("X-Sentry-Auth", meta.auth_header()) + .header("X-Forwarded-For", meta.forwarded_for()) + .header("Content-Type", envelope::CONTENT_TYPE); + + builder + .body( + envelope + .to_vec() + // XXX: upstream actor should allow for custom error type, + // right now we are forced to shoehorn our envelope errors into + // UpstreamRequestError + .map_err(failure::Error::from) + .map_err(actix_web::Error::from) + .map_err(HttpError::Actix) + .map_err(UpstreamRequestError::Http)? + .into(), + ) + .map_err(UpstreamRequestError::Http) + }, + ); + + let future = self + .upstream + .send(request) + .map_err(SendEnvelopeError::ScheduleFailed) + .and_then(move |result| { + if let Err(UpstreamRequestError::RateLimited(upstream_limits)) = result { + let limits = upstream_limits.scope(&scoping); + project.do_send(UpdateRateLimits(limits.clone())); + Err(SendEnvelopeError::RateLimited(limits)) + } else { + result.map_err(SendEnvelopeError::SendFailed) + } + }); + + Box::new(future) + } } impl Actor for EventManager { @@ -1412,6 +1561,22 @@ impl Actor for EventManager { } } +/// Queues an envelope for processing. +/// +/// Depending on the items in the envelope, there are multiple outcomes: +/// +/// - Events and event related items, such as attachments, are always queued together. See +/// [`HandleEnvelope`] for a full description of how queued envelopes are processed by the +/// `EventManager`. +/// - Sessions and Session batches are always queued separately. If they occur in the same envelope +/// as an event, they are split off. +/// - Metrics are directly sent to the `EventProcessor`, bypassing the manager's queue and going +/// straight into metrics aggregation. See [`ProcessMetrics`] for a full description. +/// +/// Queueing can fail if the queue exceeds [`Config::event_buffer_size`]. In this case, `Err` is +/// returned and the envelope is not queued. Otherwise, this message responds with `Ok`. If it +/// contained an event-related item, such as an event payload or an attachment, this contains +/// `Some(EventId)`. pub struct QueueEnvelope { pub envelope: Envelope, pub project: Addr, @@ -1445,11 +1610,29 @@ impl Handler for EventManager { let event_id = message.envelope.event_id(); + // Remove metrics from the envelope and queue them directly on the project's `Aggregator`. + let mut metric_items = Vec::new(); + let is_metric = |i: &Item| matches!(i.ty(), ItemType::Metrics | ItemType::MetricBuckets); + while let Some(item) = message.envelope.take_item_by(is_metric) { + metric_items.push(item); + } + + if !metric_items.is_empty() { + relay_log::trace!("sending metrics into processing queue"); + self.processor.do_send(ProcessMetrics { + items: metric_items, + project: message.project.clone(), + start_time: message.start_time, + sent_at: message.envelope.sent_at(), + }); + } + // Split the envelope into event-related items and other items. This allows to fast-track: // 1. Envelopes with only session items. They only require rate limiting. // 2. Event envelope processing can bail out if the event is filtered or rate limited, // since all items depend on this event. if let Some(event_envelope) = message.envelope.split_by(Item::requires_event) { + relay_log::trace!("queueing separate envelope for non-event items"); self.current_active_events += 1; context.notify(HandleEnvelope { envelope: event_envelope, @@ -1459,23 +1642,38 @@ impl Handler for EventManager { }); } - self.current_active_events += 1; - context.notify(HandleEnvelope { - envelope: message.envelope, - sampling_project: message.sampling_project, - project: message.project, - start_time: message.start_time, - }); + if !message.envelope.is_empty() { + relay_log::trace!("queueing envelope"); + self.current_active_events += 1; + context.notify(HandleEnvelope { + envelope: message.envelope, + sampling_project: message.sampling_project, + project: message.project, + start_time: message.start_time, + }); + } // Actual event handling is performed asynchronously in a separate future. The lifetime of // that future will be tied to the EventManager's context. This allows to keep the Project // actor alive even if it is cleaned up in the ProjectManager. - relay_log::trace!("queued event"); Ok(event_id) } } +/// Handles a queued envelope. +/// +/// 1. Ensures the project state is up-to-date and then validates the envelope against the state and +/// cached rate limits. See [`CheckEnvelope`] for full information. +/// 2. Executes dynamic sampling using the sampling project. +/// 3. Runs the envelope through the [`EventProcessor`] worker pool, which parses items, applies +/// normalization, and runs filtering logic. +/// 4. Sends the envelope to the upstream or stores it in Kafka, depending on the +/// [`processing`](Config::processing_enabled) flag. +/// 5. Captures [`Outcome`]s for dropped items and envelopes. +/// +/// This operation is invoked by [`QueueEnvelope`] for envelopes containing all items except +/// metrics. struct HandleEnvelope { pub envelope: Envelope, pub project: Addr, @@ -1507,16 +1705,11 @@ impl Handler for EventManager { // being sent to the upstream (including delays in the upstream). This can be regarded // the total time an event spent in this relay, corrected by incoming network delays. - let upstream = self.upstream.clone(); let processor = self.processor.clone(); let outcome_producer = self.outcome_producer.clone(); let capture = self.config.relay_mode() == RelayMode::Capture; - let http_encoding = self.config.http_encoding(); let processing_enabled = self.config.processing_enabled(); - #[cfg(feature = "processing")] - let store_forwarder = self.store_forwarder.clone(); - let HandleEnvelope { envelope, project, @@ -1591,108 +1784,31 @@ impl Handler for EventManager { } })) .into_actor(self) - .and_then(clone!(scoping, is_received, |mut envelope, slf, _| { - #[cfg(feature = "processing")] - { - if let Some(store_forwarder) = store_forwarder { - relay_log::trace!("sending envelope to kafka"); - let future = store_forwarder - .send(StoreEnvelope { - envelope, - start_time, - scoping: *scoping.borrow(), - }) - .map_err(ProcessingError::ScheduleFailed) - .and_then(move |result| result.map_err(ProcessingError::StoreFailed)) - .into_actor(slf); - - return Box::new(future) as ResponseActFuture<_, _, _>; - } - } - - // if we are in capture mode, we stash away the event instead of - // forwarding it. - if capture { - // XXX: this is wrong because captured_events does not take envelopes without - // event_id into account. - if let Some(event_id) = event_id { - relay_log::debug!("capturing envelope"); - slf.captures.insert(event_id, Ok(envelope)); - } else { - relay_log::debug!("dropping non event envelope"); - } - return Box::new(fut::ok(())) as ResponseActFuture<_, _, _>; - } - - relay_log::trace!("sending event to sentry endpoint"); - let project_id = scoping.borrow().project_id; - let request = SendRequest::post(format!("/api/{}/envelope/", project_id)).build( - move |mut builder: RequestBuilder| { - // Override the `sent_at` timestamp. Since the event went through basic - // normalization, all timestamps have been corrected. We propagate the new - // `sent_at` to allow the next Relay to double-check this timestamp and - // potentially apply correction again. This is done as close to sending as - // possible so that we avoid internal delays. - envelope.set_sent_at(Utc::now()); - - let meta = envelope.meta(); - - if let Some(origin) = meta.origin() { - builder.header("Origin", origin.as_str()); - } - - if let Some(user_agent) = meta.user_agent() { - builder.header("User-Agent", user_agent); - } - - builder - .content_encoding(http_encoding) - .header("X-Sentry-Auth", meta.auth_header()) - .header("X-Forwarded-For", meta.forwarded_for()) - .header("Content-Type", envelope::CONTENT_TYPE); - - builder - .body( - envelope - .to_vec() - // XXX: upstream actor should allow for custom error type, - // right now we are forced to shoehorn our envelope errors into - // UpstreamRequestError - .map_err(failure::Error::from) - .map_err(actix_web::Error::from) - .map_err(HttpError::Actix) - .map_err(UpstreamRequestError::Http)? - .into(), - ) - .map_err(UpstreamRequestError::Http) - }, - ); - - let future = upstream - .send(request) - .map_err(ProcessingError::ScheduleFailed) - .and_then(move |result| { + .and_then(clone!(scoping, is_received, |envelope, slf, _| { + slf.send_envelope(project, envelope, *scoping.borrow(), start_time) + .then(move |result| { let received = match result { Ok(_) => true, - Err(ref e) => e.is_received(), + Err(SendEnvelopeError::RateLimited(_)) => true, + Err(SendEnvelopeError::SendFailed(ref e)) => e.is_received(), + Err(_) => false, }; // Flag that upstream has received the request, which will skip outcome // generation below. is_received.store(received, Ordering::SeqCst); - result.map_err(move |error| match error { - UpstreamRequestError::RateLimited(upstream_limits) => { - let limits = upstream_limits.scope(&scoping.borrow()); - project.do_send(UpdateRateLimits(limits.clone())); - ProcessingError::RateLimited(limits) + result.map_err(|error| match error { + SendEnvelopeError::ScheduleFailed(e) => { + ProcessingError::ScheduleFailed(e) } - other => ProcessingError::SendFailed(other), + #[cfg(feature = "processing")] + SendEnvelopeError::StoreFailed(e) => ProcessingError::StoreFailed(e), + SendEnvelopeError::SendFailed(e) => ProcessingError::SendFailed(e), + SendEnvelopeError::RateLimited(e) => ProcessingError::RateLimited(e), }) }) - .into_actor(slf); - - Box::new(future) as ResponseActFuture<_, _, _> + .into_actor(slf) })) .timeout(self.config.event_buffer_expiry(), ProcessingError::Timeout) .map(|_, _, _| metric!(counter(RelayCounters::EnvelopeAccepted) += 1)) @@ -1767,6 +1883,66 @@ impl Handler for EventManager { } } +/// Sends a batch of pre-aggregated metrics to the upstream or Kafka. +/// +/// Responds with `Err` if there was an error sending some or all of the buckets, containing the +/// failed buckets. +pub struct SendMetrics { + /// The pre-aggregated metric buckets. + pub buckets: Vec, + /// Scoping information for the metrics. + pub scoping: Scoping, + /// The project of the metrics. + pub project: Addr, +} + +impl fmt::Debug for SendMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct(std::any::type_name::()) + .field("buckets", &self.buckets) + .field("scoping", &self.scoping) + .field("project", &format_args!("Addr")) + .finish() + } +} + +impl Message for SendMetrics { + type Result = Result<(), Vec>; +} + +impl Handler for EventManager { + type Result = ResponseFuture<(), Vec>; + + fn handle(&mut self, message: SendMetrics, _context: &mut Self::Context) -> Self::Result { + let SendMetrics { + buckets, + scoping, + project, + } = message; + + let upstream = self.config.upstream_descriptor(); + let dsn = PartialDsn { + scheme: upstream.scheme(), + public_key: scoping.public_key, + host: upstream.host().to_owned(), + port: upstream.port(), + path: "".to_owned(), + project_id: Some(scoping.project_id), + }; + + let mut item = Item::new(ItemType::MetricBuckets); + item.set_payload(ContentType::Json, Bucket::serialize_all(&buckets).unwrap()); + let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn)); + envelope.add_item(item); + + let future = self + .send_envelope(project, envelope, scoping, Instant::now()) + .map_err(|_| buckets); + + Box::new(future) + } +} + /// Resolves a [`CapturedEnvelope`] by the given `event_id`. pub struct GetCapturedEnvelope { pub event_id: EventId, diff --git a/relay-server/src/actors/project.rs b/relay-server/src/actors/project.rs index bc5f62b4d3..abfa535995 100644 --- a/relay-server/src/actors/project.rs +++ b/relay-server/src/actors/project.rs @@ -3,6 +3,7 @@ use std::time::{Duration, Instant}; use actix::prelude::*; use chrono::{DateTime, Utc}; +use futures::future; use futures::{future::Shared, sync::oneshot, Future}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -15,9 +16,13 @@ use relay_config::Config; use relay_filter::{matches_any_origin, FiltersConfig}; use relay_general::pii::{DataScrubbingConfig, PiiConfig}; use relay_general::store::BreakdownsConfig; +use relay_metrics::{ + AggregateMetricsError, Aggregator, Bucket, FlushBuckets, InsertMetrics, MergeBuckets, +}; use relay_quotas::{Quota, RateLimits, Scoping}; use relay_sampling::SamplingConfig; +use crate::actors::events::{EventManager, SendMetrics}; use crate::actors::outcome::DiscardReason; use crate::actors::project_cache::{FetchProjectState, ProjectCache, ProjectError}; use crate::envelope::Envelope; @@ -271,12 +276,15 @@ impl ProjectState { } } - /// Returns `Scoping` information for this project state. + /// Amends request `Scoping` with information from this project state. /// /// This scoping amends `RequestMeta::get_partial_scoping` by adding organization and key info. /// The processor must fetch the full scoping before attempting to rate limit with partial /// scoping. - pub fn get_scoping(&self, meta: &RequestMeta) -> Scoping { + /// + /// To get the own scoping of this ProjectKey without amending request information, use + /// [`Project::scoping`] instead. + pub fn scope_request(&self, meta: &RequestMeta) -> Scoping { let mut scoping = meta.get_partial_scoping(); // The key configuration may be missing if the event has been queued for extended times and @@ -314,6 +322,31 @@ impl ProjectState { self.config.quotas.as_slice() } + /// Returns `Err` if the project is known to be invalid or disabled. + /// + /// If this project state is hard outdated, this returns `Ok(())`, instead, to avoid prematurely + /// dropping data. + pub fn check_disabled(&self, config: &Config) -> Result<(), DiscardReason> { + // if the state is out of date, we proceed as if it was still up to date. The + // upstream relay (or sentry) will still filter events. + if self.outdated(config) == Outdated::HardOutdated { + return Ok(()); + } + + // if we recorded an invalid project state response from the upstream (i.e. parsing + // failed), discard the event with a state reason. + if self.invalid() { + return Err(DiscardReason::ProjectState); + } + + // only drop events if we know for sure the project or key are disabled. + if self.disabled() { + return Err(DiscardReason::ProjectId); + } + + Ok(()) + } + /// Determines whether the given request should be accepted or discarded. /// /// Returns `Ok(())` if the request should be accepted. Returns `Err(DiscardReason)` if the @@ -340,21 +373,8 @@ impl ProjectState { return Err(DiscardReason::ProjectId); } - // if the state is out of date, we proceed as if it was still up to date. The - // upstream relay (or sentry) will still filter events. - - if self.outdated(config) != Outdated::HardOutdated { - // if we recorded an invalid project state response from the upstream (i.e. parsing - // failed), discard the event with a state reason. - if self.invalid() { - return Err(DiscardReason::ProjectState); - } - - // only drop events if we know for sure the project or key are disabled. - if self.disabled() { - return Err(DiscardReason::ProjectId); - } - } + // Check for invalid or disabled projects. + self.check_disabled(config)?; Ok(()) } @@ -408,6 +428,22 @@ impl StateChannel { } } +/// States for the metrics [`Aggregator`] within a [`Project`]. +/// +/// This allows to initialize an aggregator on demand and permanently disable it during project +/// state updates. +#[derive(Debug)] +enum AggregatorState { + /// The aggregator has not been initialized. + Unknown, + + /// The aggregator is initialized and available. + Available(Addr), + + /// The aggregator is disabled and metrics should be dropped. + Unavailable, +} + /// Actor representing organization and project configuration for a project key. /// /// This actor no longer uniquely identifies a project. Instead, it identifies a project key. @@ -416,6 +452,8 @@ pub struct Project { public_key: ProjectKey, config: Arc, manager: Addr, + event_manager: Addr, + aggregator: AggregatorState, state: Option>, state_channel: Option, rate_limits: RateLimits, @@ -423,11 +461,19 @@ pub struct Project { } impl Project { - pub fn new(key: ProjectKey, config: Arc, manager: Addr) -> Self { + /// Creates a new `Project` actor. + pub fn new( + key: ProjectKey, + config: Arc, + manager: Addr, + event_manager: Addr, + ) -> Self { Project { public_key: key, config, manager, + event_manager, + aggregator: AggregatorState::Unknown, state: None, state_channel: None, rate_limits: RateLimits::new(), @@ -435,10 +481,59 @@ impl Project { } } + /// Returns a reference to the project state if available. pub fn state(&self) -> Option<&ProjectState> { self.state.as_deref() } + /// Creates the aggregator if it is uninitialized and returns it. + /// + /// Returns `None` if the aggregator is permanently disabled, primarily for disabled projects. + fn get_or_create_aggregator( + &mut self, + context: &mut Context, + ) -> Option> { + if matches!(self.aggregator, AggregatorState::Unknown) { + let flush_receiver = context.address().recipient(); + let aggregator = Aggregator::new(self.config.aggregator_config(), flush_receiver); + // TODO: This starts the aggregator on the project arbiter, but we want a separate + // thread or thread pool for this. + self.aggregator = AggregatorState::Available(aggregator.start()); + } + + if let AggregatorState::Available(ref aggregator) = self.aggregator { + Some(aggregator.clone()) + } else { + None + } + } + + /// Updates the aggregator based on updates to the project state. + /// + /// Changes to the aggregator depend on the project state: + /// + /// 1. The project state is missing: In this case, the project has not been loaded, so the + /// aggregator remains unmodified. + /// 2. The project state is valid: Create an aggregator if it has previously been marked as + /// `Unavailable`, but leave it uninitialized otherwise. + /// 3. The project state is disabled or invalid: Mark the aggregator as `Unavailable`, + /// potentially removing an existing aggregator. + /// + /// If the aggregator is not stopped immediately. Existing requests can continue and the + /// aggregator will be stopped when the last reference drops. + fn update_aggregator(&mut self, context: &mut Context) { + let metrics_allowed = match self.state() { + Some(state) => state.check_disabled(&self.config).is_ok(), + None => return, + }; + + if metrics_allowed && matches!(self.aggregator, AggregatorState::Unavailable) { + self.get_or_create_aggregator(context); + } else if !metrics_allowed { + self.aggregator = AggregatorState::Unavailable; + } + } + fn get_or_fetch_state( &mut self, mut no_cache: bool, @@ -520,7 +615,7 @@ impl Project { no_cache, }) .into_actor(self) - .map(move |state_result, slf, _ctx| { + .map(move |state_result, slf, context| { let channel = match slf.state_channel.take() { Some(channel) => channel, None => return, @@ -535,6 +630,7 @@ impl Project { slf.state_channel = None; slf.state = state_result.map(|resp| resp.state).ok(); + slf.update_aggregator(context); if let Some(ref state) = slf.state { relay_log::debug!("project state {} updated", public_key); @@ -545,9 +641,30 @@ impl Project { .spawn(context); } - fn get_scoping(&mut self, meta: &RequestMeta) -> Scoping { + /// Creates `Scoping` for this project if the state is loaded. + /// + /// Returns `Some` if the project state has been fetched and contains a project identifier, + /// otherwise `None`. + fn scoping(&self) -> Option { + let state = self.state()?; + Some(Scoping { + organization_id: state.organization_id.unwrap_or(0), + project_id: state.project_id?, + public_key: self.public_key, + key_id: state + .get_public_key_config() + .and_then(|config| config.numeric_id), + }) + } + + /// Amends request `Scoping` with information from this project state. + /// + /// If the project state is loaded, information from the project state is merged into the + /// request's scoping. Otherwise, this function returns partial scoping from the `request_meta`. + /// See [`RequestMeta::get_partial_scoping`] for more information. + fn scope_request(&self, meta: &RequestMeta) -> Scoping { match self.state() { - Some(state) => state.get_scoping(meta), + Some(state) => state.scope_request(meta), None => meta.get_partial_scoping(), } } @@ -582,7 +699,7 @@ impl Project { } fn check_envelope_scoped(&mut self, message: CheckEnvelope) -> CheckEnvelopeResponse { - let scoping = self.get_scoping(message.envelope.meta()); + let scoping = self.scope_request(message.envelope.meta()); let result = self.check_envelope(message.envelope, &scoping); CheckEnvelopeResponse { result, scoping } } @@ -749,3 +866,80 @@ impl Handler for Project { self.rate_limits.merge(rate_limits); } } + +impl Handler for Project { + type Result = Result<(), AggregateMetricsError>; + + fn handle(&mut self, message: InsertMetrics, context: &mut Self::Context) -> Self::Result { + // Only keep if we have an aggregator, otherwise drop because we know that we were disabled. + if let Some(aggregator) = self.get_or_create_aggregator(context) { + aggregator.do_send(message); + } + + Ok(()) + } +} + +impl Handler for Project { + type Result = Result<(), AggregateMetricsError>; + + fn handle(&mut self, message: MergeBuckets, context: &mut Self::Context) -> Self::Result { + // Only keep if we have an aggregator, otherwise drop because we know that we were disabled. + if let Some(aggregator) = self.get_or_create_aggregator(context) { + aggregator.do_send(message); + } + + Ok(()) + } +} + +impl Handler for Project { + type Result = ResponseFuture<(), Vec>; + + fn handle(&mut self, message: FlushBuckets, context: &mut Self::Context) -> Self::Result { + let outdated = match self.state() { + Some(state) => state.outdated(&self.config), + None => Outdated::HardOutdated, + }; + + // Schedule an update to the project state if it is outdated, regardless of whether the + // metrics can be forwarded or not. We never wait for this update. + if outdated != Outdated::Updated { + self.get_or_fetch_state(false, context); + } + + // If the state is outdated, we need to wait for an updated state. Put them back into the + // aggregator and wait for the next flush cycle. + if outdated == Outdated::HardOutdated { + return Box::new(future::err(message.into_buckets())); + } + + let (state, scoping) = match (self.state(), self.scoping()) { + (Some(state), Some(scoping)) => (state, scoping), + _ => return Box::new(future::err(message.into_buckets())), + }; + + // Only send if the project state is valid, otherwise drop this bucket. + if state.check_disabled(&self.config).is_err() { + return Box::new(future::ok(())); + } + + let future = self + .event_manager + .send(SendMetrics { + buckets: message.into_buckets(), + scoping, + project: context.address(), + }) + .then(move |send_result| match send_result { + Ok(Ok(())) => Ok(()), + Ok(Err(buckets)) => Err(buckets), + Err(_) => { + relay_log::error!("dropped metric buckets: event manager mailbox full"); + Ok(()) + } + }); + + Box::new(future) + } +} diff --git a/relay-server/src/actors/project_cache.rs b/relay-server/src/actors/project_cache.rs index 80cee42ca5..9799590836 100644 --- a/relay-server/src/actors/project_cache.rs +++ b/relay-server/src/actors/project_cache.rs @@ -12,6 +12,7 @@ use relay_common::{metric, ProjectKey}; use relay_config::{Config, RelayMode}; use relay_redis::RedisPool; +use crate::actors::events::EventManager; use crate::actors::project::{Project, ProjectState}; use crate::actors::project_local::LocalProjectSource; use crate::actors::project_upstream::UpstreamProjectSource; @@ -42,6 +43,7 @@ pub struct ProjectCache { config: Arc, projects: HashMap, + event_manager: Addr, local_source: Addr, upstream_source: Addr, #[cfg(feature = "processing")] @@ -51,6 +53,7 @@ pub struct ProjectCache { impl ProjectCache { pub fn new( config: Arc, + event_manager: Addr, upstream_relay: Addr, _redis: Option, ) -> Self { @@ -72,6 +75,7 @@ impl ProjectCache { config, projects: HashMap::new(), + event_manager, local_source, upstream_source, #[cfg(feature = "processing")] @@ -147,7 +151,14 @@ impl Handler for ProjectCache { } Entry::Vacant(entry) => { metric!(counter(RelayCounters::ProjectCacheMiss) += 1); - let project = Project::new(public_key, config, context.address()).start(); + let project = Project::new( + public_key, + config, + context.address(), + self.event_manager.clone(), + ) + .start(); + entry.insert(ProjectEntry { last_updated_at: Instant::now(), project: project.clone(), diff --git a/relay-server/src/actors/store.rs b/relay-server/src/actors/store.rs index 6a28b1474f..d9a8d44f5c 100644 --- a/relay-server/src/actors/store.rs +++ b/relay-server/src/actors/store.rs @@ -11,7 +11,7 @@ use failure::{Fail, ResultExt}; use rdkafka::error::KafkaError; use rdkafka::producer::BaseRecord; use rdkafka::ClientConfig; -use relay_metrics::{Metric, MetricUnit, MetricValue}; +use relay_metrics::{Bucket, BucketValue, MetricUnit}; use rmp_serde::encode::Error as RmpError; use serde::{ser::Error, Serialize}; @@ -317,19 +317,16 @@ impl StoreForwarder { ) -> Result<(), StoreError> { let payload = item.payload(); - // NB: Metrics are guaranteed to contain a timestamp at this point. - for metric_result in Metric::parse_all(&payload, UnixTimestamp::from_secs(0)) { - if let Ok(metric) = metric_result { - self.send_metric_message(MetricKafkaMessage { - org_id, - project_id, - name: metric.name, - unit: metric.unit, - value: metric.value, - timestamp: metric.timestamp, - tags: metric.tags, - })?; - } + for bucket in Bucket::parse_all(&payload).unwrap_or_default() { + self.send_metric_message(MetricKafkaMessage { + org_id, + project_id, + name: bucket.name, + unit: MetricUnit::default(), + value: bucket.value, + timestamp: bucket.timestamp, + tags: bucket.tags, + })?; } Ok(()) @@ -501,13 +498,13 @@ struct SessionKafkaMessage { struct MetricKafkaMessage { org_id: u64, project_id: ProjectId, - pub name: String, - pub unit: MetricUnit, + name: String, + unit: MetricUnit, #[serde(flatten)] - pub value: MetricValue, - pub timestamp: UnixTimestamp, + value: BucketValue, + timestamp: UnixTimestamp, #[serde(skip_serializing_if = "BTreeMap::is_empty")] - pub tags: BTreeMap, + tags: BTreeMap, } /// An enum over all possible ingest messages. @@ -634,7 +631,7 @@ impl Handler for StoreForwarder { item, )?; } - ItemType::Metrics => { + ItemType::MetricBuckets => { self.produce_metrics(scoping.organization_id, scoping.project_id, item)? } _ => {} diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 1f8aca3d91..822f4c8d11 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -357,8 +357,9 @@ fn check_envelope_size_limits(config: &Config, envelope: &Envelope) -> bool { } ItemType::Session => session_count += 1, ItemType::Sessions => session_count += 1, - ItemType::Metrics => (), ItemType::UserReport => (), + ItemType::Metrics => (), + ItemType::MetricBuckets => (), } } diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 9dda88cbe9..b5c1095d64 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -95,8 +95,10 @@ pub enum ItemType { Session, /// Aggregated session data. Sessions, - /// Metrics. + /// Individual metrics in text encoding. Metrics, + /// Buckets of preaggregated metrics encoded as JSON. + MetricBuckets, } impl ItemType { @@ -126,6 +128,7 @@ impl fmt::Display for ItemType { Self::Session => write!(f, "session"), Self::Sessions => write!(f, "aggregated sessions"), Self::Metrics => write!(f, "metrics"), + Self::MetricBuckets => write!(f, "metric buckets"), } } } @@ -527,9 +530,11 @@ impl Item { ItemType::FormData => false, // The remaining item types cannot carry event payloads. - ItemType::UserReport | ItemType::Session | ItemType::Sessions | ItemType::Metrics => { - false - } + ItemType::UserReport + | ItemType::Session + | ItemType::Sessions + | ItemType::Metrics + | ItemType::MetricBuckets => false, } } @@ -549,6 +554,7 @@ impl Item { ItemType::Session => false, ItemType::Sessions => false, ItemType::Metrics => false, + ItemType::MetricBuckets => false, } } } diff --git a/relay-server/src/extractors/request_meta.rs b/relay-server/src/extractors/request_meta.rs index 7437d6d10a..7b4e453c7d 100644 --- a/relay-server/src/extractors/request_meta.rs +++ b/relay-server/src/extractors/request_meta.rs @@ -61,12 +61,12 @@ impl ResponseError for BadEventMeta { /// transparently serializes to and deserializes from a DSN string. #[derive(Debug, Clone, Eq, PartialEq)] pub struct PartialDsn { - scheme: Scheme, - public_key: ProjectKey, - host: String, - port: u16, - path: String, - project_id: Option, + pub scheme: Scheme, + pub public_key: ProjectKey, + pub host: String, + pub port: u16, + pub path: String, + pub project_id: Option, } impl PartialDsn { @@ -251,6 +251,21 @@ impl RequestMeta { } impl RequestMeta { + /// Creates meta for an outbound request of this Relay. + pub fn outbound(dsn: PartialDsn) -> Self { + Self { + dsn, + client: Some(crate::constants::CLIENT.to_owned()), + version: default_version(), + origin: None, + remote_addr: None, + forwarded_for: "".to_string(), + user_agent: Some(crate::constants::SERVER.to_owned()), + no_cache: false, + start_time: Instant::now(), + } + } + #[cfg(test)] // TODO: Remove Dsn here? pub fn new(dsn: relay_common::Dsn) -> Self { diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 3dafc6a978..1cd61e4b50 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -139,8 +139,13 @@ impl ServiceState { .context(ServerErrorKind::ConfigError)? .start(); - let project_cache = - ProjectCache::new(config.clone(), upstream_relay.clone(), redis_pool).start(); + let project_cache = ProjectCache::new( + config.clone(), + event_manager.clone(), + upstream_relay.clone(), + redis_pool, + ) + .start(); Ok(ServiceState { config: config.clone(), diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index 53370b6305..788e0b8982 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -96,6 +96,7 @@ fn infer_event_category(item: &Item) -> Option { ItemType::Session => None, ItemType::Sessions => None, ItemType::Metrics => None, + ItemType::MetricBuckets => None, ItemType::FormData => None, ItemType::UserReport => None, } diff --git a/tests/integration/fixtures/processing.py b/tests/integration/fixtures/processing.py index 182e5ca50f..005e644b0b 100644 --- a/tests/integration/fixtures/processing.py +++ b/tests/integration/fixtures/processing.py @@ -279,12 +279,14 @@ def sessions_consumer(kafka_consumer): @pytest.fixture def metrics_consumer(kafka_consumer): - return lambda: MetricsConsumer(*kafka_consumer("metrics")) + return lambda timeout=2: MetricsConsumer( + timeout=timeout, *kafka_consumer("metrics") + ) class MetricsConsumer(ConsumerBase): - def get_metric(self): - message = self.poll() + def get_metric(self, timeout=None): + message = self.poll(timeout=timeout) assert message is not None assert message.error() is None diff --git a/tests/integration/test_metrics.py b/tests/integration/test_metrics.py index c3ea693c59..289b35c020 100644 --- a/tests/integration/test_metrics.py +++ b/tests/integration/test_metrics.py @@ -1,8 +1,14 @@ from datetime import datetime, timezone +import json -def test_metrics(mini_sentry, relay_chain): - relay = relay_chain() +TEST_CONFIG = { + "aggregator": {"bucket_interval": 1, "initial_delay": 0, "debounce_delay": 0,} +} + + +def test_metrics(mini_sentry, relay): + relay = relay(mini_sentry, options=TEST_CONFIG) project_id = 42 mini_sentry.add_basic_project_config(project_id) @@ -11,18 +17,21 @@ def test_metrics(mini_sentry, relay_chain): metrics_payload = f"foo:42|c|'{timestamp}\nbar:17|c|'{timestamp}" relay.send_metrics(project_id, metrics_payload) - envelope = mini_sentry.captured_events.get(timeout=1) + envelope = mini_sentry.captured_events.get(timeout=2) assert len(envelope.items) == 1 metrics_item = envelope.items[0] - assert metrics_item.type == "metrics" + assert metrics_item.type == "metric_buckets" received_metrics = metrics_item.get_bytes() - assert received_metrics.decode() == metrics_payload + assert json.loads(received_metrics.decode()) == [ + {"timestamp": timestamp, "name": "foo", "value": 42.0, "type": "c"}, + {"timestamp": timestamp, "name": "bar", "value": 17.0, "type": "c"}, + ] def test_metrics_with_processing(mini_sentry, relay_with_processing, metrics_consumer): - relay = relay_with_processing() + relay = relay_with_processing(options=TEST_CONFIG) metrics_consumer = metrics_consumer() project_id = 42 @@ -43,3 +52,51 @@ def test_metrics_with_processing(mini_sentry, relay_with_processing, metrics_con "type": "c", "timestamp": timestamp, } + + metric = metrics_consumer.get_metric() + + assert metric == { + "org_id": 1, + "project_id": project_id, + "name": "bar", + "unit": "", + "value": 17.0, + "type": "c", + "timestamp": timestamp, + } + + +def test_metrics_full(mini_sentry, relay, relay_with_processing, metrics_consumer): + metrics_consumer = metrics_consumer() + + upstream_config = { + "aggregator": { + "bucket_interval": 1, + "initial_delay": 2, # Give upstream some time to process downstream entries: + "debounce_delay": 0, + } + } + upstream = relay_with_processing(options=upstream_config) + + downstream = relay(upstream, options=TEST_CONFIG) + + # Create project config + project_id = 42 + mini_sentry.add_full_project_config(project_id) + + # Send two events to downstream and one to upstream + downstream.send_metrics(project_id, "foo:7|c") + downstream.send_metrics(project_id, "foo:5|c") + + upstream.send_metrics(project_id, "foo:3|c") + + metric = metrics_consumer.get_metric(timeout=4) + metric.pop("timestamp") + assert metric == { + "org_id": 1, + "project_id": project_id, + "name": "foo", + "unit": "", + "value": 15.0, + "type": "c", + }