Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "ref(cache): Move buffering of pending envelope to ProjectCache" #1906

Merged
merged 2 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
- Revert back the addition of metric names as tag on Sentry errors when relay drops metrics. ([#1873](https://github.com/getsentry/relay/pull/1873))
- Tag the dynamic sampling decision on `count_per_root_project` to measure effective sample rates. ([#1870](https://github.com/getsentry/relay/pull/1870))
- Deprecate fields on the profiling sample format. ([#1878](https://github.com/getsentry/relay/pull/1878))
- Move the pending envelopes buffering into the project cache. ([#1879](https://github.com/getsentry/relay/pull/1879))

## 23.2.0

Expand Down
124 changes: 118 additions & 6 deletions relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -19,15 +20,16 @@ use relay_system::BroadcastChannel;

use crate::actors::envelopes::{EnvelopeManager, SendMetrics};
use crate::actors::outcome::{DiscardReason, Outcome};
#[cfg(feature = "processing")]
use crate::actors::processor::EnvelopeProcessor;
use crate::actors::project_cache::{CheckedEnvelope, ProjectCache, RequestUpdate};
use crate::actors::processor::{EnvelopeProcessor, ProcessEnvelope};
use crate::actors::project_cache::{
AddSamplingState, CheckedEnvelope, ProjectCache, RequestUpdate,
};
use crate::envelope::Envelope;
use crate::extractors::RequestMeta;

use crate::service::Registry;
use crate::statsd::RelayCounters;
use crate::utils::{EnvelopeContext, EnvelopeLimiter, MetricsLimiter, RetryBackoff};
use crate::utils::{self, EnvelopeContext, EnvelopeLimiter, MetricsLimiter, RetryBackoff};

#[cfg(feature = "processing")]
use crate::actors::processor::RateLimitFlushBuckets;
Expand Down Expand Up @@ -397,6 +399,8 @@ pub struct Project {
config: Arc<Config>,
state: Option<Arc<ProjectState>>,
state_channel: Option<StateChannel>,
pending_validations: VecDeque<(Box<Envelope>, EnvelopeContext)>,
pending_sampling: VecDeque<ProcessEnvelope>,
rate_limits: RateLimits,
last_no_cache: Instant,
}
Expand All @@ -412,6 +416,8 @@ impl Project {
config,
state: None,
state_channel: None,
pending_validations: VecDeque::new(),
pending_sampling: VecDeque::new(),
rate_limits: RateLimits::new(),
last_no_cache: Instant::now(),
}
Expand Down Expand Up @@ -447,7 +453,7 @@ impl Project {
/// Returns the project state if it is not expired.
///
/// Convenience wrapper around [`expiry_state`](Self::expiry_state).
pub fn valid_state(&self) -> Option<Arc<ProjectState>> {
fn valid_state(&self) -> Option<Arc<ProjectState>> {
match self.expiry_state() {
ExpiryState::Updated(state) => Some(state),
ExpiryState::Stale(state) => Some(state),
Expand Down Expand Up @@ -636,9 +642,93 @@ impl Project {
self.get_cached_state(no_cache);
}

/// Validates the envelope and submits the envelope to the next stage.
///
/// If this project is disabled or rate limited, corresponding items are dropped from the
/// envelope. Remaining items in the Envelope are forwarded:
/// - If the envelope needs dynamic sampling, this sends [`AddSamplingState`] to the
/// [`ProjectCache`] to add the required project state.
/// - Otherwise, the envelope is directly submitted to the [`EnvelopeProcessor`].
fn flush_validation(
&mut self,
envelope: Box<Envelope>,
envelope_context: EnvelopeContext,
project_state: Arc<ProjectState>,
) {
if let Ok(checked) = self.check_envelope(envelope, envelope_context) {
if let Some((envelope, envelope_context)) = checked.envelope {
let mut process = ProcessEnvelope {
envelope,
envelope_context,
project_state,
sampling_project_state: None,
};

if let Some(sampling_key) = utils::get_sampling_key(&process.envelope) {
let own_key = process
.project_state
.get_public_key_config()
.map(|c| c.public_key);

if Some(sampling_key) == own_key {
process.sampling_project_state = Some(process.project_state.clone());
EnvelopeProcessor::from_registry().send(process);
} else {
ProjectCache::from_registry()
.send(AddSamplingState::new(sampling_key, process));
}
} else {
EnvelopeProcessor::from_registry().send(process);
}
}
}
}

/// Enqueues an envelope for validation.
///
/// If the project state is up to date, the message will be immediately sent to the next stage.
/// Otherwise, this queues the envelope and flushes it when the project has been updated.
///
/// This method will trigger an update of the project state internally if the state is stale or
/// outdated.
pub fn enqueue_validation(&mut self, envelope: Box<Envelope>, context: EnvelopeContext) {
match self.get_cached_state(envelope.meta().no_cache()) {
Some(state) if !state.invalid() => self.flush_validation(envelope, context, state),
_ => self.pending_validations.push_back((envelope, context)),
}
}

/// Adds the project state for dynamic sampling and submits the Envelope for processing.
fn flush_sampling(&self, mut message: ProcessEnvelope) {
// Intentionally ignore all errors. Fallback sampling behavior applies in this case.
if let Some(state) = self.valid_state().filter(|state| !state.invalid()) {
// Never use rules from another organization.
if state.organization_id == message.project_state.organization_id {
message.sampling_project_state = Some(state);
}
}

EnvelopeProcessor::from_registry().send(message);
}

/// Enqueues an envelope for adding a dynamic sampling project state.
///
/// If the project state is up to date, the message will be immediately submitted for
/// processing. Otherwise, this queues the envelope and flushes it when the project has been
/// updated.
///
/// This method will trigger an update of the project state internally if the state is stale or
/// outdated.
pub fn enqueue_sampling(&mut self, message: ProcessEnvelope) {
match self.get_cached_state(message.envelope.meta().no_cache()) {
Some(_) => self.flush_sampling(message),
None => self.pending_sampling.push_back(message),
}
}

/// Replaces the internal project state with a new one and triggers pending actions.
///
/// This flushes pending envelopes from [`ValidateEnvelope`] and
/// This flushes pending envelopes from [`ValidateEnvelope`] and [`AddSamplingState`] and
/// notifies all pending receivers from [`get_state`](Self::get_state).
///
/// `no_cache` should be passed from the requesting call. Updates with `no_cache` will always
Expand Down Expand Up @@ -685,6 +775,16 @@ impl Project {
return;
}

// Flush all queued `ValidateEnvelope` messages
while let Some((envelope, context)) = self.pending_validations.pop_front() {
self.flush_validation(envelope, context, state.clone());
}

// Flush all queued `AddSamplingState` messages
while let Some(message) = self.pending_sampling.pop_front() {
self.flush_sampling(message);
}

// Flush all waiting recipients.
relay_log::debug!("project state {} updated", self.project_key);
channel.inner.send(state);
Expand Down Expand Up @@ -821,6 +921,18 @@ impl Project {
}
}

impl Drop for Project {
fn drop(&mut self) {
let count = self.pending_validations.len() + self.pending_sampling.len();
if count > 0 {
relay_log::with_scope(
|scope| scope.set_tag("project_key", self.project_key),
|| relay_log::error!("dropped project with {} envelopes", count),
);
}
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
Loading