diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d0744c94f..1ba16664d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,9 +21,9 @@ - Revert back the addition of metric names as tag on Sentry errors when relay drops metrics. ([#1873](https://github.com/getsentry/relay/pull/1873)) - Tag the dynamic sampling decision on `count_per_root_project` to measure effective sample rates. ([#1870](https://github.com/getsentry/relay/pull/1870)) - Deprecate fields on the profiling sample format. ([#1878](https://github.com/getsentry/relay/pull/1878)) -- Move the pending envelopes buffering into the project cache. ([#1879](https://github.com/getsentry/relay/pull/1879)) - Remove idle samples at the start and end of a profile and useless metadata. ([#1894](https://github.com/getsentry/relay/pull/1894)) + ## 23.2.0 **Features**: diff --git a/relay-server/src/actors/project.rs b/relay-server/src/actors/project.rs index 6cf0dbd154..aba6a626dd 100644 --- a/relay-server/src/actors/project.rs +++ b/relay-server/src/actors/project.rs @@ -1,3 +1,4 @@ +use std::collections::VecDeque; use std::sync::Arc; use std::time::Duration; @@ -19,15 +20,16 @@ use relay_system::BroadcastChannel; use crate::actors::envelopes::{EnvelopeManager, SendMetrics}; use crate::actors::outcome::{DiscardReason, Outcome}; -#[cfg(feature = "processing")] -use crate::actors::processor::EnvelopeProcessor; -use crate::actors::project_cache::{CheckedEnvelope, ProjectCache, RequestUpdate}; +use crate::actors::processor::{EnvelopeProcessor, ProcessEnvelope}; +use crate::actors::project_cache::{ + AddSamplingState, CheckedEnvelope, ProjectCache, RequestUpdate, +}; use crate::envelope::Envelope; use crate::extractors::RequestMeta; use crate::service::Registry; use crate::statsd::RelayCounters; -use crate::utils::{EnvelopeContext, EnvelopeLimiter, MetricsLimiter, RetryBackoff}; +use crate::utils::{self, EnvelopeContext, EnvelopeLimiter, MetricsLimiter, RetryBackoff}; #[cfg(feature = "processing")] use crate::actors::processor::RateLimitFlushBuckets; @@ -397,6 +399,8 @@ pub struct Project { config: Arc, state: Option>, state_channel: Option, + pending_validations: VecDeque<(Box, EnvelopeContext)>, + pending_sampling: VecDeque, rate_limits: RateLimits, last_no_cache: Instant, } @@ -412,6 +416,8 @@ impl Project { config, state: None, state_channel: None, + pending_validations: VecDeque::new(), + pending_sampling: VecDeque::new(), rate_limits: RateLimits::new(), last_no_cache: Instant::now(), } @@ -447,7 +453,7 @@ impl Project { /// Returns the project state if it is not expired. /// /// Convenience wrapper around [`expiry_state`](Self::expiry_state). - pub fn valid_state(&self) -> Option> { + fn valid_state(&self) -> Option> { match self.expiry_state() { ExpiryState::Updated(state) => Some(state), ExpiryState::Stale(state) => Some(state), @@ -636,9 +642,93 @@ impl Project { self.get_cached_state(no_cache); } + /// Validates the envelope and submits the envelope to the next stage. + /// + /// If this project is disabled or rate limited, corresponding items are dropped from the + /// envelope. Remaining items in the Envelope are forwarded: + /// - If the envelope needs dynamic sampling, this sends [`AddSamplingState`] to the + /// [`ProjectCache`] to add the required project state. + /// - Otherwise, the envelope is directly submitted to the [`EnvelopeProcessor`]. + fn flush_validation( + &mut self, + envelope: Box, + envelope_context: EnvelopeContext, + project_state: Arc, + ) { + if let Ok(checked) = self.check_envelope(envelope, envelope_context) { + if let Some((envelope, envelope_context)) = checked.envelope { + let mut process = ProcessEnvelope { + envelope, + envelope_context, + project_state, + sampling_project_state: None, + }; + + if let Some(sampling_key) = utils::get_sampling_key(&process.envelope) { + let own_key = process + .project_state + .get_public_key_config() + .map(|c| c.public_key); + + if Some(sampling_key) == own_key { + process.sampling_project_state = Some(process.project_state.clone()); + EnvelopeProcessor::from_registry().send(process); + } else { + ProjectCache::from_registry() + .send(AddSamplingState::new(sampling_key, process)); + } + } else { + EnvelopeProcessor::from_registry().send(process); + } + } + } + } + + /// Enqueues an envelope for validation. + /// + /// If the project state is up to date, the message will be immediately sent to the next stage. + /// Otherwise, this queues the envelope and flushes it when the project has been updated. + /// + /// This method will trigger an update of the project state internally if the state is stale or + /// outdated. + pub fn enqueue_validation(&mut self, envelope: Box, context: EnvelopeContext) { + match self.get_cached_state(envelope.meta().no_cache()) { + Some(state) if !state.invalid() => self.flush_validation(envelope, context, state), + _ => self.pending_validations.push_back((envelope, context)), + } + } + + /// Adds the project state for dynamic sampling and submits the Envelope for processing. + fn flush_sampling(&self, mut message: ProcessEnvelope) { + // Intentionally ignore all errors. Fallback sampling behavior applies in this case. + if let Some(state) = self.valid_state().filter(|state| !state.invalid()) { + // Never use rules from another organization. + if state.organization_id == message.project_state.organization_id { + message.sampling_project_state = Some(state); + } + } + + EnvelopeProcessor::from_registry().send(message); + } + + /// Enqueues an envelope for adding a dynamic sampling project state. + /// + /// If the project state is up to date, the message will be immediately submitted for + /// processing. Otherwise, this queues the envelope and flushes it when the project has been + /// updated. + /// + /// This method will trigger an update of the project state internally if the state is stale or + /// outdated. + pub fn enqueue_sampling(&mut self, message: ProcessEnvelope) { + match self.get_cached_state(message.envelope.meta().no_cache()) { + Some(_) => self.flush_sampling(message), + None => self.pending_sampling.push_back(message), + } + } + /// Replaces the internal project state with a new one and triggers pending actions. /// - /// This flushes pending envelopes from [`ValidateEnvelope`] and + /// This flushes pending envelopes from [`ValidateEnvelope`] and [`AddSamplingState`] and /// notifies all pending receivers from [`get_state`](Self::get_state). /// /// `no_cache` should be passed from the requesting call. Updates with `no_cache` will always @@ -685,6 +775,16 @@ impl Project { return; } + // Flush all queued `ValidateEnvelope` messages + while let Some((envelope, context)) = self.pending_validations.pop_front() { + self.flush_validation(envelope, context, state.clone()); + } + + // Flush all queued `AddSamplingState` messages + while let Some(message) = self.pending_sampling.pop_front() { + self.flush_sampling(message); + } + // Flush all waiting recipients. relay_log::debug!("project state {} updated", self.project_key); channel.inner.send(state); @@ -821,6 +921,18 @@ impl Project { } } +impl Drop for Project { + fn drop(&mut self) { + let count = self.pending_validations.len() + self.pending_sampling.len(); + if count > 0 { + relay_log::with_scope( + |scope| scope.set_tag("project_key", self.project_key), + || relay_log::error!("dropped project with {} envelopes", count), + ); + } + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/relay-server/src/actors/project_cache.rs b/relay-server/src/actors/project_cache.rs index 4d8f9fbd61..3133c0e21a 100644 --- a/relay-server/src/actors/project_cache.rs +++ b/relay-server/src/actors/project_cache.rs @@ -1,4 +1,3 @@ -use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; use tokio::sync::mpsc; @@ -13,7 +12,7 @@ use relay_statsd::metric; use relay_system::{Addr, FromMessage, Interface, Sender, Service}; use crate::actors::outcome::DiscardReason; -use crate::actors::processor::{EnvelopeProcessor, ProcessEnvelope}; +use crate::actors::processor::ProcessEnvelope; use crate::actors::project::{Project, ProjectSender, ProjectState}; use crate::actors::project_local::{LocalProjectSource, LocalProjectSourceService}; use crate::actors::project_upstream::{UpstreamProjectSource, UpstreamProjectSourceService}; @@ -132,12 +131,11 @@ impl CheckEnvelope { /// [`CheckEnvelope`]. Once the envelope has been validated, remaining items are forwarded to the /// next stage: /// -/// - If the envelope needs dynamic sampling, and the project state is not cached or out of the -/// date, the envelopes is spooled and we continue when the state is fetched. +/// - If the envelope needs dynamic sampling, this sends [`AddSamplingState`] to the +/// [`ProjectCache`] to add the required project state. /// - Otherwise, the envelope is directly submitted to the [`EnvelopeProcessor`]. /// /// [`EnvelopeProcessor`]: crate::actors::processor::EnvelopeProcessor -#[derive(Debug)] pub struct ValidateEnvelope { envelope: Box, context: EnvelopeContext, @@ -149,6 +147,27 @@ impl ValidateEnvelope { } } +/// Adds the project state for dynamic sampling and sends the envelope to processing. +/// +/// If the project state is up to date, the envelope will be immediately submitted for processing. +/// Otherwise, this queues the envelope and flushes it when the project has been updated. +/// +/// This message will trigger an update of the project state internally if the state is stale or +/// outdated. +pub struct AddSamplingState { + project_key: ProjectKey, + message: ProcessEnvelope, +} + +impl AddSamplingState { + pub fn new(project_key: ProjectKey, message: ProcessEnvelope) -> Self { + Self { + project_key, + message, + } + } +} + pub struct UpdateRateLimits { project_key: ProjectKey, rate_limits: RateLimits, @@ -187,6 +206,7 @@ pub enum ProjectCache { Sender>, ), ValidateEnvelope(ValidateEnvelope), + AddSamplingState(AddSamplingState), UpdateRateLimits(UpdateRateLimits), InsertMetrics(InsertMetrics), MergeBuckets(MergeBuckets), @@ -247,6 +267,14 @@ impl FromMessage for ProjectCache { } } +impl FromMessage for ProjectCache { + type Response = relay_system::NoResponse; + + fn from_message(message: AddSamplingState, _: ()) -> Self { + Self::AddSamplingState(message) + } +} + impl FromMessage for ProjectCache { type Response = relay_system::NoResponse; @@ -372,86 +400,6 @@ struct UpdateProjectState { no_cache: bool, } -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] -struct QueueKey { - own_key: ProjectKey, - sampling_key: ProjectKey, -} - -impl QueueKey { - fn new(own_key: ProjectKey, sampling_key: ProjectKey) -> Self { - Self { - own_key, - sampling_key, - } - } -} - -/// The queue (buffer) of the incoming envelopes. -#[derive(Debug, Default)] -struct Queue { - /// Contains the cache of the incoming envelopes. - buffer: BTreeMap, EnvelopeContext)>>, - /// Index of the buffered project keys. - index: BTreeMap>, -} - -impl Queue { - /// Creates an empty queue. - pub fn new() -> Self { - Self::default() - } - - /// Adds the value to the queue for the provided key. - pub fn enqueue(&mut self, key: QueueKey, value: (Box, EnvelopeContext)) { - self.index.entry(key.own_key).or_default().insert(key); - self.index.entry(key.sampling_key).or_default().insert(key); - self.buffer.entry(key).or_default().push(value); - } - - /// Returns the list of buffered envelopes if they satisfy a predicate. - pub fn dequeue

( - &mut self, - partial_key: &ProjectKey, - predicate: P, - ) -> Vec<(Box, EnvelopeContext)> - where - P: Fn(&QueueKey) -> bool, - { - let mut result = Vec::new(); - - let mut queue_keys = self.index.remove(partial_key).unwrap_or_default(); - let mut index = BTreeSet::new(); - - while let Some(queue_key) = queue_keys.pop_first() { - // Find those keys which match predicates and return keys into the index, where - // predicate is failing. - if predicate(&queue_key) { - if let Some(envelopes) = self.buffer.remove(&queue_key) { - result.extend(envelopes); - } - } else { - index.insert(queue_key); - } - } - - if !index.is_empty() { - self.index.insert(*partial_key, index); - } - - result - } -} - -impl Drop for Queue { - fn drop(&mut self) { - let count: usize = self.buffer.values().map(|v| v.len()).sum(); - if count > 0 { - relay_log::error!("dropped queue with {} envelopes", count); - } - } -} - /// Main broker of the [`ProjectCacheService`]. /// /// This handles incoming public messages, merges resolved project states, and maintains the actual @@ -464,7 +412,6 @@ struct ProjectCacheBroker { garbage_disposal: GarbageDisposal, source: ProjectSource, state_tx: mpsc::UnboundedSender, - pending_envelopes: Queue, } impl ProjectCacheBroker { @@ -484,15 +431,7 @@ impl ProjectCacheBroker { // Defer dropping the projects to a dedicated thread: let mut count = 0; - for (project_key, project) in expired { - // Dequeue all the envelopes linked to the disposable project, which will be dropped - // once this for loop exits with an `Invalid(Internal)` outcome. - let envelopes = self.pending_envelopes.dequeue(&project_key, |_| true); - relay_log::with_scope( - |scope| scope.set_tag("project_key", project_key), - || relay_log::error!("evicted project with {} envelopes", envelopes.len()), - ); - + for (_, project) in expired { self.garbage_disposal.dispose(project); count += 1; } @@ -500,7 +439,7 @@ impl ProjectCacheBroker { // Log garbage queue size: let queue_size = self.garbage_disposal.queue_size() as f64; - metric!(gauge(RelayGauges::ProjectCacheGarbageQueueSize) = queue_size); + relay_statsd::metric!(gauge(RelayGauges::ProjectCacheGarbageQueueSize) = queue_size); metric!(timer(RelayTimers::ProjectStateEvictionDuration) = eviction_start.elapsed()); } @@ -521,10 +460,6 @@ impl ProjectCacheBroker { }) } - /// Updates the [`Project`] with received [`ProjectState`]. - /// - /// If the project state is valid, the internal `pending_envelopes` queue is also checked if - /// there are any envelopes buffered for this specific project, which could be processed now. fn merge_state(&mut self, message: UpdateProjectState) { let UpdateProjectState { project_key, @@ -533,38 +468,7 @@ impl ProjectCacheBroker { } = message; self.get_or_create_project(project_key) - .update_state(state.clone(), no_cache); - - // Envelopes need to remain in the queue while Relay receives invalid states from upstream. - if state.invalid() { - return; - } - - let envelopes = self.pending_envelopes.dequeue(&project_key, |queue_key| { - let partial_key = if queue_key.own_key == project_key { - queue_key.sampling_key - } else { - queue_key.own_key - }; - - // We return false if project is not cached or its state is invalid, true otherwise. - // We only have to check `partial_key`, because we already know that the `project_key`s `state` - // is valid and loaded. - self.projects - .get(&partial_key) - // Make sure we have only cached and valid state. - .and_then(|p| p.valid_state()) - .map_or(false, |s| !s.invalid()) - }); - - // Flush envelopes where both states have resolved. - for (envelope, envelope_context) in envelopes { - let sampling_state = utils::get_sampling_key(&envelope) - .and_then(|key| self.projects.get(&key)) - .and_then(|p| p.valid_state()); - - self.handle_processing(state.clone(), sampling_state, envelope, envelope_context); - } + .update_state(state, no_cache) } fn handle_request_update(&mut self, message: RequestUpdate) { @@ -615,90 +519,30 @@ impl ProjectCacheBroker { &mut self, message: CheckEnvelope, ) -> Result { - let CheckEnvelope { envelope, context } = message; - let project = self.get_or_create_project(envelope.meta().public_key()); + let project = self.get_or_create_project(message.envelope.meta().public_key()); + // Preload the project cache so that it arrives a little earlier in processing. However, // do not pass `no_cache`. In case the project is rate limited, we do not want to force // a full reload. Fetching must not block the store request. project.prefetch(false); - project.check_envelope(envelope, context) - } - - /// Handles the processing of the provided envelope. - fn handle_processing( - &mut self, - state: Arc, - sampling_state: Option>, - envelope: Box, - envelope_context: EnvelopeContext, - ) { - let project_key = envelope.meta().public_key(); - // The `Envelope` and `EnvelopeContext` will be dropped if the `Project::check_envelope()` - // function returns any error, which will also be ignored here. - let Some(Ok(checked)) = self - .projects - .get_mut(&project_key) - .map(|p| p.check_envelope(envelope, envelope_context)) else { return; }; - - if let Some((envelope, envelope_context)) = checked.envelope { - let mut process = ProcessEnvelope { - envelope, - envelope_context, - project_state: state.clone(), - sampling_project_state: None, - }; - if let Some(sampling_state) = sampling_state { - if state.organization_id == sampling_state.organization_id { - process.sampling_project_state = Some(sampling_state) - } - } - - EnvelopeProcessor::from_registry().send(process); - } + project.check_envelope(message.envelope, message.context) } - /// Checks an incoming envelope and decides either process it immediately or buffer it. - /// - /// Few conditions are checked here: - /// - If there is no dynamic sampling key and the project is already cached, we do straight to - /// processing otherwise buffer the envelopes. - /// - If the dynamic sampling key is provided and if the root and sampling projects - /// are cached - process the envelope, buffer otherwise. - /// - /// This means if the caches are hot we always process all the incoming envelopes without any - /// delay. But in case the project state cannot be fetched, we keep buffering till the state - /// is eventually updated. - /// - /// The flushing of the buffered envelopes happens in `update_state`. fn handle_validate_envelope(&mut self, message: ValidateEnvelope) { - let ValidateEnvelope { envelope, context } = message; - - // Fetch the project state for our key and make sure it's not invalid. - let own_key = envelope.meta().public_key(); - let project_state = self - .get_or_create_project(own_key) - .get_cached_state(envelope.meta().no_cache()) - .filter(|st| !st.invalid()); - - // Also, fetch the project state for sampling key and make sure it's not invalid. - let sampling_key = utils::get_sampling_key(&envelope); - let sampling_state = sampling_key.and_then(|key| { - self.get_or_create_project(key) - .get_cached_state(envelope.meta().no_cache()) - .filter(|st| !st.invalid()) - }); - - // Trigger processing once we have a project state and we either have a sampling project - // state or we do not need one. - if let Some(state) = project_state { - if sampling_state.is_some() || sampling_key.is_none() { - return self.handle_processing(state, sampling_state, envelope, context); - } + // Preload the project cache for dynamic sampling in parallel to the main one. + if let Some(sampling_key) = utils::get_sampling_key(&message.envelope) { + self.get_or_create_project(sampling_key) + .prefetch(message.envelope.meta().no_cache()); } - let key = QueueKey::new(own_key, sampling_key.unwrap_or(own_key)); - self.pending_envelopes.enqueue(key, (envelope, context)); + self.get_or_create_project(message.envelope.meta().public_key()) + .enqueue_validation(message.envelope, message.context); + } + + fn handle_add_sampling_state(&mut self, message: AddSamplingState) { + self.get_or_create_project(message.project_key) + .enqueue_sampling(message.message); } fn handle_rate_limits(&mut self, message: UpdateRateLimits) { @@ -734,6 +578,7 @@ impl ProjectCacheBroker { sender.send(self.handle_check_envelope(message)) } ProjectCache::ValidateEnvelope(message) => self.handle_validate_envelope(message), + ProjectCache::AddSamplingState(message) => self.handle_add_sampling_state(message), ProjectCache::UpdateRateLimits(message) => self.handle_rate_limits(message), ProjectCache::InsertMetrics(message) => self.handle_insert_metrics(message), ProjectCache::MergeBuckets(message) => self.handle_merge_buckets(message), @@ -777,7 +622,6 @@ impl Service for ProjectCacheService { garbage_disposal: GarbageDisposal::new(), source: ProjectSource::start(config, redis), state_tx, - pending_envelopes: Queue::new(), }; loop { diff --git a/tests/integration/test_query.py b/tests/integration/test_query.py index dc4e1854bc..2c23cf18c5 100644 --- a/tests/integration/test_query.py +++ b/tests/integration/test_query.py @@ -18,7 +18,7 @@ def test_local_project_config(mini_sentry, relay): project_id = 42 config = mini_sentry.basic_project_config(project_id) relay_config = { - "cache": {"file_interval": 1, "project_expiry": 1, "project_grace_period": 0} + "cache": {"file_interval": 1, "project_expiry": 0, "project_grace_period": 0} } relay = relay(mini_sentry, relay_config, wait_health_check=False) relay.config_dir.mkdir("projects").join("42.json").write(