Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(server): Transform EnvelopeManager into a sequential pipeline #1416

Merged
merged 13 commits into from
Aug 18, 2022
Merged
281 changes: 67 additions & 214 deletions relay-server/src/actors/envelopes.rs

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions relay-server/src/actors/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::convert::TryInto;
use std::fmt;
use std::mem;
use std::net::IpAddr;
use std::sync::Arc;
Expand Down Expand Up @@ -197,6 +198,21 @@ impl Outcome {
}
}

impl fmt::Display for Outcome {
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Outcome::Filtered(key) => write!(f, "filtered by {}", key),
Outcome::FilteredSampling(rule) => write!(f, "sampling rule {}", rule),
Outcome::RateLimited(None) => write!(f, "rate limited"),
Outcome::RateLimited(Some(reason)) => write!(f, "rate limited with reason {}", reason),
Outcome::Invalid(DiscardReason::Internal) => write!(f, "internal error"),
Outcome::Invalid(reason) => write!(f, "invalid data ({})", reason),
Outcome::Abuse => write!(f, "abuse limit reached"),
Outcome::ClientDiscard(reason) => write!(f, "discarded by client ({})", reason),
}
}
}

/// Reason for a discarded invalid event.
///
/// Used in `Outcome::Invalid`. Synchronize overlap with Sentry.
Expand Down Expand Up @@ -351,6 +367,12 @@ impl DiscardReason {
}
}

impl fmt::Display for DiscardReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.name())
}
}

/// The outcome message is serialized as json and placed on the Kafka topic or in
/// the http using TrackRawOutcome
#[derive(Debug, Serialize, Deserialize, Clone)]
Expand Down
119 changes: 63 additions & 56 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,24 @@ use relay_redis::RedisPool;
use relay_sampling::RuleId;
use relay_statsd::metric;

use crate::actors::envelopes::{SendEnvelope, SendEnvelopeError};
use crate::actors::envelopes::{EnvelopeManager, SendEnvelope, SendEnvelopeError, SubmitEnvelope};
use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::actors::outcome_aggregator::OutcomeAggregator;
use crate::actors::project::{Feature, ProjectState};
use crate::actors::project_cache::{InsertMetrics, MergeBuckets, ProjectCache, ProjectError};
use crate::actors::upstream::{SendRequest, UpstreamRelay, UpstreamRequestError};
use crate::envelope::{AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemType};
use crate::actors::project_cache::{InsertMetrics, MergeBuckets, ProjectCache};
use crate::actors::upstream::{SendRequest, UpstreamRelay};
use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType};
use crate::metrics_extraction::sessions::{extract_session_metrics, SessionMetricsConfig};
use crate::metrics_extraction::transactions::extract_transaction_metrics;
use crate::service::ServerError;
use crate::service::{ServerError, REGISTRY};
use crate::statsd::{RelayCounters, RelayTimers};
use crate::utils::{
self, ChunkedFormDataAggregator, EnvelopeContext, ErrorBoundary, FormDataIter, SamplingResult,
};

#[cfg(feature = "processing")]
use {
crate::actors::store::StoreError,
crate::actors::project_cache::UpdateRateLimits,
crate::service::ServerErrorKind,
crate::utils::EnvelopeLimiter,
failure::ResultExt,
Expand Down Expand Up @@ -92,12 +92,6 @@ pub enum ProcessingError {
#[fail(display = "failed to extract event payload")]
NoEventPayload,

#[fail(display = "could not schedule project fetch")]
ScheduleFailed,

#[fail(display = "failed to resolve project information")]
ProjectFailed(#[cause] ProjectError),

#[fail(display = "missing project id in DSN")]
MissingProjectId,

Expand All @@ -107,31 +101,12 @@ pub enum ProcessingError {
#[fail(display = "invalid security report")]
InvalidSecurityReport(#[cause] serde_json::Error),

#[fail(display = "submission rejected with reason: {:?}", _0)]
Rejected(DiscardReason),

#[fail(display = "event filtered with reason: {:?}", _0)]
EventFiltered(FilterStatKey),

#[fail(display = "could not serialize event payload")]
SerializeFailed(#[cause] serde_json::Error),

#[fail(display = "could not build envelope for upstream")]
EnvelopeBuildFailed(#[cause] EnvelopeError),

#[fail(display = "could not encode request body")]
BodyEncodingFailed(#[cause] std::io::Error),

#[fail(display = "could not send request to upstream")]
UpstreamRequestFailed(#[cause] UpstreamRequestError),

#[cfg(feature = "processing")]
#[fail(display = "could not store envelope")]
StoreFailed(#[cause] StoreError),

#[fail(display = "envelope items were rate limited")]
RateLimited,

#[cfg(feature = "processing")]
#[fail(display = "failed to apply quotas")]
QuotasFailed(#[cause] RateLimitingError),
Expand All @@ -141,7 +116,7 @@ pub enum ProcessingError {
}

impl ProcessingError {
pub fn to_outcome(&self) -> Option<Outcome> {
fn to_outcome(&self) -> Option<Outcome> {
match *self {
// General outcomes for invalid events
Self::PayloadTooLarge => Some(Outcome::Invalid(DiscardReason::TooLarge)),
Expand All @@ -164,29 +139,23 @@ impl ProcessingError {
Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),

// Internal errors
Self::SerializeFailed(_)
| Self::ProjectFailed(_)
| Self::ProcessingFailed(_)
| Self::MissingProjectId => Some(Outcome::Invalid(DiscardReason::Internal)),
Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
Some(Outcome::Invalid(DiscardReason::Internal))
}
#[cfg(feature = "processing")]
Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),

// These outcomes are emitted at the source.
Self::ScheduleFailed => None,
Self::Rejected(_) => None,
Self::MissingProjectId => None,
Self::EventFiltered(_) => None,
Self::Sampled(_) => None,
Self::RateLimited => None,
#[cfg(feature = "processing")]
Self::StoreFailed(_) => None,

// If we send to an upstream, we don't emit outcomes.
Self::UpstreamRequestFailed(_)
| Self::EnvelopeBuildFailed(_)
| Self::BodyEncodingFailed(_) => None,
}
}

fn is_internal(&self) -> bool {
self.to_outcome() == Some(Outcome::Invalid(DiscardReason::Internal))
}

fn should_keep_metrics(&self) -> bool {
matches!(self, Self::Sampled(_))
}
Expand Down Expand Up @@ -385,6 +354,10 @@ pub struct EnvelopeProcessor {
}

impl EnvelopeProcessor {
pub fn from_registry() -> Addr<Self> {
REGISTRY.get().unwrap().processor.clone()
}

/// Starts a multi-threaded envelope processor.
pub fn start(
config: Arc<Config>,
Expand Down Expand Up @@ -927,7 +900,7 @@ impl EnvelopeProcessor {
) -> Result<ProcessEnvelopeState, ProcessingError> {
let ProcessEnvelope {
mut envelope,
envelope_context,
mut envelope_context,
project_state,
sampling_project_state,
} = message;
Expand All @@ -944,10 +917,16 @@ impl EnvelopeProcessor {
//
// Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
// since we cannot process an envelope without project ID, so drop it.
let project_id = project_state
let project_id = match project_state
.project_id
.or_else(|| envelope.meta().project_id())
.ok_or(ProcessingError::MissingProjectId)?;
{
Some(project_id) => project_id,
None => {
envelope_context.reject(Outcome::Invalid(DiscardReason::Internal));
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
return Err(ProcessingError::MissingProjectId);
}
};

// Ensure the project ID is updated to the stored instance for this project cache. This can
// differ in two cases:
Expand Down Expand Up @@ -1554,12 +1533,18 @@ impl EnvelopeProcessor {
envelope_limiter.assume_event(category);
}

let scoping = state.envelope_context.scoping();
let (enforcement, limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), {
envelope_limiter
.enforce(&mut state.envelope, &state.envelope_context.scoping())
.enforce(&mut state.envelope, &scoping)
.map_err(ProcessingError::QuotasFailed)?
});

if limits.is_limited() {
ProjectCache::from_registry()
.do_send(UpdateRateLimits::new(scoping.project_key, limits.clone()));
}

state.rate_limits = limits;
enforcement.track_outcomes(&state.envelope, &state.envelope_context.scoping());

Expand Down Expand Up @@ -1827,6 +1812,7 @@ impl EnvelopeProcessor {
let project_id = state.project_id;
let client = state.envelope.meta().client().map(str::to_owned);
let user_agent = state.envelope.meta().user_agent().map(str::to_owned);
let project_key = state.envelope.meta().public_key();

relay_log::with_scope(
|scope| {
Expand All @@ -1839,8 +1825,6 @@ impl EnvelopeProcessor {
}
},
|| {
let project_key = state.envelope_context.scoping().project_key;

match self.process_state(&mut state) {
Ok(()) => {
if !state.extracted_metrics.is_empty() {
Expand Down Expand Up @@ -1952,18 +1936,41 @@ pub struct ProcessEnvelope {
}

impl Message for ProcessEnvelope {
type Result = Result<ProcessEnvelopeResponse, ProcessingError>;
type Result = ();
}

impl Handler<ProcessEnvelope> for EnvelopeProcessor {
type Result = Result<ProcessEnvelopeResponse, ProcessingError>;
type Result = ();

fn handle(&mut self, message: ProcessEnvelope, _context: &mut Self::Context) -> Self::Result {
let project_key = message.envelope.meta().public_key();
let wait_time = message.envelope_context.start_time().elapsed();
metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
metric!(timer(RelayTimers::EnvelopeProcessingTime), {

let result = metric!(timer(RelayTimers::EnvelopeProcessingTime), {
self.process(message)
})
});

match result {
Ok(response) => {
if let Some((envelope, envelope_context)) = response.envelope {
EnvelopeManager::from_registry().do_send(SubmitEnvelope {
envelope,
envelope_context,
})
};
}
Err(error) => {
// Errors are only logged for what we consider infrastructure or implementation
// bugs. In other cases, we "expect" errors and log them as debug level.
if error.is_internal() {
relay_log::with_scope(
|scope| scope.set_tag("project_key", project_key),
|| relay_log::error!("error processing envelope: {}", LogError(&error)),
);
}
}
}
}
}

Expand Down
Loading