Skip to content

Commit

Permalink
ref(server): Transform EnvelopeManager into a sequential pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-auer committed Aug 17, 2022
1 parent d3b9778 commit 453b92d
Show file tree
Hide file tree
Showing 9 changed files with 394 additions and 448 deletions.
258 changes: 56 additions & 202 deletions relay-server/src/actors/envelopes.rs

Large diffs are not rendered by default.

157 changes: 100 additions & 57 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use failure::Fail;
use flate2::write::{GzEncoder, ZlibEncoder};
use flate2::Compression;
use lazy_static::lazy_static;
use parking_lot::RwLock;
use serde_json::Value as SerdeValue;

use relay_auth::RelayVersion;
Expand All @@ -35,13 +36,15 @@ use relay_redis::RedisPool;
use relay_sampling::RuleId;
use relay_statsd::metric;

use crate::actors::envelopes::{SendEnvelope, SendEnvelopeError};
use crate::actors::envelopes::{
Capture, EnvelopeManager, SendEnvelope, SendEnvelopeError, SubmitEnvelope,
};
use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::actors::outcome_aggregator::OutcomeAggregator;
use crate::actors::project::{Feature, ProjectState};
use crate::actors::project_cache::{InsertMetrics, MergeBuckets, ProjectCache, ProjectError};
use crate::actors::upstream::{SendRequest, UpstreamRelay, UpstreamRequestError};
use crate::envelope::{AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemType};
use crate::actors::project_cache::{InsertMetrics, MergeBuckets, ProjectCache};
use crate::actors::upstream::{SendRequest, UpstreamRelay};
use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType};
use crate::metrics_extraction::sessions::{extract_session_metrics, SessionMetricsConfig};
use crate::metrics_extraction::transactions::extract_transaction_metrics;
use crate::service::ServerError;
Expand All @@ -52,7 +55,6 @@ use crate::utils::{

#[cfg(feature = "processing")]
use {
crate::actors::store::StoreError,
crate::service::ServerErrorKind,
crate::utils::EnvelopeLimiter,
failure::ResultExt,
Expand All @@ -64,6 +66,12 @@ use {
/// The minimum clock drift for correction to apply.
const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);

/// Singleton of the `EnvelopeProcessor` service.
///
/// Since `EnvelopeProcessor` runs in a SyncArbiter, it cannot implement `SystemService` and cannot
/// be put in the actix `SystemRegistry`.
static ADDRESS: RwLock<Option<Addr<EnvelopeProcessor>>> = RwLock::new(None);

/// An error returned when handling [`ProcessEnvelope`].
#[derive(Debug, Fail)]
pub enum ProcessingError {
Expand Down Expand Up @@ -92,12 +100,6 @@ pub enum ProcessingError {
#[fail(display = "failed to extract event payload")]
NoEventPayload,

#[fail(display = "could not schedule project fetch")]
ScheduleFailed,

#[fail(display = "failed to resolve project information")]
ProjectFailed(#[cause] ProjectError),

#[fail(display = "missing project id in DSN")]
MissingProjectId,

Expand All @@ -107,31 +109,27 @@ pub enum ProcessingError {
#[fail(display = "invalid security report")]
InvalidSecurityReport(#[cause] serde_json::Error),

#[fail(display = "submission rejected with reason: {:?}", _0)]
Rejected(DiscardReason),

#[fail(display = "event filtered with reason: {:?}", _0)]
EventFiltered(FilterStatKey),

#[fail(display = "could not serialize event payload")]
SerializeFailed(#[cause] serde_json::Error),

#[fail(display = "could not build envelope for upstream")]
EnvelopeBuildFailed(#[cause] EnvelopeError),
// #[fail(display = "could not build envelope for upstream")]
// EnvelopeBuildFailed(#[cause] EnvelopeError),

#[fail(display = "could not encode request body")]
BodyEncodingFailed(#[cause] std::io::Error),
// #[fail(display = "could not encode request body")]
// BodyEncodingFailed(#[cause] std::io::Error),

#[fail(display = "could not send request to upstream")]
UpstreamRequestFailed(#[cause] UpstreamRequestError),
// #[fail(display = "could not send request to upstream")]
// UpstreamRequestFailed(#[cause] UpstreamRequestError),

#[cfg(feature = "processing")]
#[fail(display = "could not store envelope")]
StoreFailed(#[cause] StoreError),

#[fail(display = "envelope items were rate limited")]
RateLimited,
// #[cfg(feature = "processing")]
// #[fail(display = "could not store envelope")]
// StoreFailed(#[cause] StoreError),

// #[fail(display = "envelope items were rate limited")]
// RateLimited,
#[cfg(feature = "processing")]
#[fail(display = "failed to apply quotas")]
QuotasFailed(#[cause] RateLimitingError),
Expand All @@ -141,7 +139,7 @@ pub enum ProcessingError {
}

impl ProcessingError {
pub fn to_outcome(&self) -> Option<Outcome> {
fn to_outcome(&self) -> Option<Outcome> {
match *self {
// General outcomes for invalid events
Self::PayloadTooLarge => Some(Outcome::Invalid(DiscardReason::TooLarge)),
Expand All @@ -164,29 +162,31 @@ impl ProcessingError {
Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),

// Internal errors
Self::SerializeFailed(_)
| Self::ProjectFailed(_)
| Self::ProcessingFailed(_)
| Self::MissingProjectId => Some(Outcome::Invalid(DiscardReason::Internal)),
Self::SerializeFailed(_) | Self::ProcessingFailed(_) | Self::MissingProjectId => {
Some(Outcome::Invalid(DiscardReason::Internal))
}
#[cfg(feature = "processing")]
Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),

// These outcomes are emitted at the source.
Self::ScheduleFailed => None,
Self::Rejected(_) => None,
// Self::Rejected(_) => None,
Self::EventFiltered(_) => None,
Self::Sampled(_) => None,
Self::RateLimited => None,
#[cfg(feature = "processing")]
Self::StoreFailed(_) => None,
// Self::RateLimited => None,
// #[cfg(feature = "processing")]
// Self::StoreFailed(_) => None,

// If we send to an upstream, we don't emit outcomes.
Self::UpstreamRequestFailed(_)
| Self::EnvelopeBuildFailed(_)
| Self::BodyEncodingFailed(_) => None,
// Self::UpstreamRequestFailed(_)
// | Self::EnvelopeBuildFailed(_)
// | Self::BodyEncodingFailed(_) => None,
}
}

fn is_internal(&self) -> bool {
self.to_outcome() == Some(Outcome::Invalid(DiscardReason::Internal))
}

fn should_keep_metrics(&self) -> bool {
matches!(self, Self::Sampled(_))
}
Expand Down Expand Up @@ -385,16 +385,17 @@ pub struct EnvelopeProcessor {
}

impl EnvelopeProcessor {
pub fn from_registry() -> Addr<Self> {
ADDRESS.read().clone().unwrap()
}

/// Starts a multi-threaded envelope processor.
pub fn start(
config: Arc<Config>,
_redis: Option<RedisPool>,
) -> Result<Addr<Self>, ServerError> {
pub fn start(config: Arc<Config>, _redis: Option<RedisPool>) -> Result<(), ServerError> {
let thread_count = config.cpu_concurrency();
relay_log::info!("starting {} envelope processing workers", thread_count);

#[cfg(feature = "processing")]
{
let addr = {
let geoip_lookup = match config.geoip_path() {
Some(p) => Some(Arc::new(
GeoIpLookup::open(p).context(ServerErrorKind::GeoIpError)?,
Expand All @@ -405,21 +406,24 @@ impl EnvelopeProcessor {
let rate_limiter =
_redis.map(|pool| RedisRateLimiter::new(pool).max_limit(config.max_rate_limit()));

Ok(SyncArbiter::start(
SyncArbiter::start(
thread_count,
clone!(config, || {
EnvelopeProcessor::new(config.clone())
.with_rate_limiter(rate_limiter.clone())
.with_geoip_lookup(geoip_lookup.clone())
}),
))
}
)
};

#[cfg(not(feature = "processing"))]
Ok(SyncArbiter::start(
let addr = SyncArbiter::start(
thread_count,
clone!(config, || EnvelopeProcessor::new(config.clone())),
))
);

*ADDRESS.write() = Some(addr);
Ok(())
}

#[inline]
Expand Down Expand Up @@ -1526,6 +1530,8 @@ impl EnvelopeProcessor {

#[cfg(feature = "processing")]
fn enforce_quotas(&self, state: &mut ProcessEnvelopeState) -> Result<(), ProcessingError> {
use crate::actors::project_cache::UpdateRateLimits;

let rate_limiter = match self.rate_limiter.as_ref() {
Some(rate_limiter) => rate_limiter,
None => return Ok(()),
Expand Down Expand Up @@ -1554,12 +1560,18 @@ impl EnvelopeProcessor {
envelope_limiter.assume_event(category);
}

let scoping = state.envelope_context.scoping();
let (enforcement, limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), {
envelope_limiter
.enforce(&mut state.envelope, &state.envelope_context.scoping())
.enforce(&mut state.envelope, &scoping)
.map_err(ProcessingError::QuotasFailed)?
});

if limits.is_limited() {
ProjectCache::from_registry()
.do_send(UpdateRateLimits::new(scoping.project_key, limits.clone()));
}

state.rate_limits = limits;
enforcement.track_outcomes(&state.envelope, &state.envelope_context.scoping());

Expand Down Expand Up @@ -1822,11 +1834,12 @@ impl EnvelopeProcessor {
&self,
message: ProcessEnvelope,
) -> Result<ProcessEnvelopeResponse, ProcessingError> {
let mut state = self.prepare_state(message)?;
let mut state = self.prepare_state(message)?; // TODO(ja): Also catch this error

let project_id = state.project_id;
let client = state.envelope.meta().client().map(str::to_owned);
let user_agent = state.envelope.meta().user_agent().map(str::to_owned);
let project_key = state.envelope.meta().public_key();

relay_log::with_scope(
|scope| {
Expand All @@ -1839,8 +1852,6 @@ impl EnvelopeProcessor {
}
},
|| {
let project_key = state.envelope_context.scoping().project_key;

match self.process_state(&mut state) {
Ok(()) => {
if !state.extracted_metrics.is_empty() {
Expand Down Expand Up @@ -1952,18 +1963,50 @@ pub struct ProcessEnvelope {
}

impl Message for ProcessEnvelope {
type Result = Result<ProcessEnvelopeResponse, ProcessingError>;
type Result = ();
}

// TODO(ja): Log dropped envelopes

impl Handler<ProcessEnvelope> for EnvelopeProcessor {
type Result = Result<ProcessEnvelopeResponse, ProcessingError>;
type Result = ();

fn handle(&mut self, message: ProcessEnvelope, _context: &mut Self::Context) -> Self::Result {
let event_id = message.envelope.event_id();
let project_key = message.envelope.meta().public_key();
let wait_time = message.envelope_context.start_time().elapsed();
metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
metric!(timer(RelayTimers::EnvelopeProcessingTime), {

let result = metric!(timer(RelayTimers::EnvelopeProcessingTime), {
self.process(message)
})
});

match result {
Ok(response) => {
if let Some((envelope, envelope_context)) = response.envelope {
EnvelopeManager::from_registry().do_send(SubmitEnvelope {
envelope,
envelope_context,
})
};
}
Err(error) => {
if Capture::should_capture(&self.config) {
EnvelopeManager::from_registry().do_send(Capture::rejected(event_id, &error));
}

// Errors are only logged for what we consider infrastructure or implementation
// bugs. In other cases, we "expect" errors and log them as debug level.
if error.is_internal() {
relay_log::with_scope(
|scope| scope.set_tag("project_key", project_key),
|| relay_log::error!("error processing envelope: {}", LogError(&error)),
);
} else {
relay_log::debug!("dropped envelope: {}", LogError(&error));
}
}
}
}
}

Expand Down
Loading

0 comments on commit 453b92d

Please sign in to comment.