Skip to content

Commit

Permalink
ref(cache): Move buffering of pending envelope to ProjectCache (#1879)
Browse files Browse the repository at this point in the history
These changes moving buffering of the incoming envelopes in the
`ProjectCache`.

Current implementation still keeps, so called queue in memory and using
`HashMap` with a composite key `QueueKey {key, sampling_key}`, where
`sampling_key` can be the same as a key if there is no sampling project
identified. The values to these keys are `Vec` of boxed `Envelope` with
their `EnvelopeContext`.

Once we get an update for project state, we check all variants of
`QueueKey` which contains the current `ProjectKey` and if all the
project states are cached we try to flush buffered envelopes indexed by
these `QeueuKey`.

The envelops will be buffered if:
* the project state is not fetched yet
* root project is here but the sampling project state is not fetched yet
* the sampling project state is here but the root project is not fetched
yet


This change also removes all the buffering from the `Project` and
reduces its responsibility. Now it just keeps its own state and
configuration and the envelope handling is done outside of it.
  • Loading branch information
olksdr authored Mar 6, 2023
1 parent 6161ae8 commit 35f5d86
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 170 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- Revert back the addition of metric names as tag on Sentry errors when relay drops metrics. ([#1873](https://github.com/getsentry/relay/pull/1873))
- Tag the dynamic sampling decision on `count_per_root_project` to measure effective sample rates. ([#1870](https://github.com/getsentry/relay/pull/1870))
- Deprecate fields on the profiling sample format. ([#1878](https://github.com/getsentry/relay/pull/1878))
- Move the pending envelopes buffering into the project cache. ([#1879](https://github.com/getsentry/relay/pull/1879))

## 23.2.0

Expand Down
124 changes: 6 additions & 118 deletions relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -20,16 +19,15 @@ use relay_system::BroadcastChannel;

use crate::actors::envelopes::{EnvelopeManager, SendMetrics};
use crate::actors::outcome::{DiscardReason, Outcome};
use crate::actors::processor::{EnvelopeProcessor, ProcessEnvelope};
use crate::actors::project_cache::{
AddSamplingState, CheckedEnvelope, ProjectCache, RequestUpdate,
};
#[cfg(feature = "processing")]
use crate::actors::processor::EnvelopeProcessor;
use crate::actors::project_cache::{CheckedEnvelope, ProjectCache, RequestUpdate};
use crate::envelope::Envelope;
use crate::extractors::RequestMeta;

use crate::service::Registry;
use crate::statsd::RelayCounters;
use crate::utils::{self, EnvelopeContext, EnvelopeLimiter, MetricsLimiter, RetryBackoff};
use crate::utils::{EnvelopeContext, EnvelopeLimiter, MetricsLimiter, RetryBackoff};

#[cfg(feature = "processing")]
use crate::actors::processor::RateLimitFlushBuckets;
Expand Down Expand Up @@ -399,8 +397,6 @@ pub struct Project {
config: Arc<Config>,
state: Option<Arc<ProjectState>>,
state_channel: Option<StateChannel>,
pending_validations: VecDeque<(Box<Envelope>, EnvelopeContext)>,
pending_sampling: VecDeque<ProcessEnvelope>,
rate_limits: RateLimits,
last_no_cache: Instant,
}
Expand All @@ -416,8 +412,6 @@ impl Project {
config,
state: None,
state_channel: None,
pending_validations: VecDeque::new(),
pending_sampling: VecDeque::new(),
rate_limits: RateLimits::new(),
last_no_cache: Instant::now(),
}
Expand Down Expand Up @@ -453,7 +447,7 @@ impl Project {
/// Returns the project state if it is not expired.
///
/// Convenience wrapper around [`expiry_state`](Self::expiry_state).
fn valid_state(&self) -> Option<Arc<ProjectState>> {
pub fn valid_state(&self) -> Option<Arc<ProjectState>> {
match self.expiry_state() {
ExpiryState::Updated(state) => Some(state),
ExpiryState::Stale(state) => Some(state),
Expand Down Expand Up @@ -642,93 +636,9 @@ impl Project {
self.get_cached_state(no_cache);
}

/// Validates the envelope and submits the envelope to the next stage.
///
/// If this project is disabled or rate limited, corresponding items are dropped from the
/// envelope. Remaining items in the Envelope are forwarded:
/// - If the envelope needs dynamic sampling, this sends [`AddSamplingState`] to the
/// [`ProjectCache`] to add the required project state.
/// - Otherwise, the envelope is directly submitted to the [`EnvelopeProcessor`].
fn flush_validation(
&mut self,
envelope: Box<Envelope>,
envelope_context: EnvelopeContext,
project_state: Arc<ProjectState>,
) {
if let Ok(checked) = self.check_envelope(envelope, envelope_context) {
if let Some((envelope, envelope_context)) = checked.envelope {
let mut process = ProcessEnvelope {
envelope,
envelope_context,
project_state,
sampling_project_state: None,
};

if let Some(sampling_key) = utils::get_sampling_key(&process.envelope) {
let own_key = process
.project_state
.get_public_key_config()
.map(|c| c.public_key);

if Some(sampling_key) == own_key {
process.sampling_project_state = Some(process.project_state.clone());
EnvelopeProcessor::from_registry().send(process);
} else {
ProjectCache::from_registry()
.send(AddSamplingState::new(sampling_key, process));
}
} else {
EnvelopeProcessor::from_registry().send(process);
}
}
}
}

/// Enqueues an envelope for validation.
///
/// If the project state is up to date, the message will be immediately sent to the next stage.
/// Otherwise, this queues the envelope and flushes it when the project has been updated.
///
/// This method will trigger an update of the project state internally if the state is stale or
/// outdated.
pub fn enqueue_validation(&mut self, envelope: Box<Envelope>, context: EnvelopeContext) {
match self.get_cached_state(envelope.meta().no_cache()) {
Some(state) if !state.invalid() => self.flush_validation(envelope, context, state),
_ => self.pending_validations.push_back((envelope, context)),
}
}

/// Adds the project state for dynamic sampling and submits the Envelope for processing.
fn flush_sampling(&self, mut message: ProcessEnvelope) {
// Intentionally ignore all errors. Fallback sampling behavior applies in this case.
if let Some(state) = self.valid_state().filter(|state| !state.invalid()) {
// Never use rules from another organization.
if state.organization_id == message.project_state.organization_id {
message.sampling_project_state = Some(state);
}
}

EnvelopeProcessor::from_registry().send(message);
}

/// Enqueues an envelope for adding a dynamic sampling project state.
///
/// If the project state is up to date, the message will be immediately submitted for
/// processing. Otherwise, this queues the envelope and flushes it when the project has been
/// updated.
///
/// This method will trigger an update of the project state internally if the state is stale or
/// outdated.
pub fn enqueue_sampling(&mut self, message: ProcessEnvelope) {
match self.get_cached_state(message.envelope.meta().no_cache()) {
Some(_) => self.flush_sampling(message),
None => self.pending_sampling.push_back(message),
}
}

/// Replaces the internal project state with a new one and triggers pending actions.
///
/// This flushes pending envelopes from [`ValidateEnvelope`] and [`AddSamplingState`] and
/// This flushes pending envelopes from [`ValidateEnvelope`] and
/// notifies all pending receivers from [`get_state`](Self::get_state).
///
/// `no_cache` should be passed from the requesting call. Updates with `no_cache` will always
Expand Down Expand Up @@ -775,16 +685,6 @@ impl Project {
return;
}

// Flush all queued `ValidateEnvelope` messages
while let Some((envelope, context)) = self.pending_validations.pop_front() {
self.flush_validation(envelope, context, state.clone());
}

// Flush all queued `AddSamplingState` messages
while let Some(message) = self.pending_sampling.pop_front() {
self.flush_sampling(message);
}

// Flush all waiting recipients.
relay_log::debug!("project state {} updated", self.project_key);
channel.inner.send(state);
Expand Down Expand Up @@ -921,18 +821,6 @@ impl Project {
}
}

impl Drop for Project {
fn drop(&mut self) {
let count = self.pending_validations.len() + self.pending_sampling.len();
if count > 0 {
relay_log::with_scope(
|scope| scope.set_tag("project_key", self.project_key),
|| relay_log::error!("dropped project with {} envelopes", count),
);
}
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
Loading

0 comments on commit 35f5d86

Please sign in to comment.