Skip to content

Commit

Permalink
ref(server): Transform EnvelopeManager into a sequential pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-auer committed Aug 17, 2022
1 parent d3b9778 commit 11d2d7f
Show file tree
Hide file tree
Showing 8 changed files with 394 additions and 383 deletions.
225 changes: 65 additions & 160 deletions relay-server/src/actors/envelopes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,15 @@ use relay_quotas::Scoping;
use relay_statsd::metric;

use crate::actors::outcome::{DiscardReason, Outcome};
use crate::actors::processor::{
EncodeEnvelope, EnvelopeProcessor, ProcessEnvelope, ProcessMetrics, ProcessingError,
};
use crate::actors::project_cache::{
CheckEnvelope, GetProjectState, ProjectCache, UpdateRateLimits,
};
use crate::actors::processor::{EncodeEnvelope, EnvelopeProcessor, ProcessMetrics};
use crate::actors::project_cache::{ProjectCache, UpdateRateLimits, ValidateEnvelope};
use crate::actors::upstream::{SendRequest, UpstreamRelay, UpstreamRequest, UpstreamRequestError};
use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemType};
use crate::extractors::{PartialDsn, RequestMeta};
use crate::http::{HttpError, Request, RequestBuilder, Response};
use crate::service::ServerError;
use crate::statsd::{RelayHistograms, RelaySets};
use crate::utils::{self, EnvelopeContext, FutureExt as _, Semaphore};
use crate::statsd::RelayHistograms;
use crate::utils::{EnvelopeContext, Semaphore};

#[cfg(feature = "processing")]
use {
Expand Down Expand Up @@ -207,7 +203,6 @@ pub struct EnvelopeManager {
config: Arc<Config>,
buffer_guard: Arc<BufferGuard>,
captures: BTreeMap<EventId, CapturedEnvelope>,
processor: Addr<EnvelopeProcessor>,
#[cfg(feature = "processing")]
store_forwarder: Option<StoreAddr<StoreEnvelope>>,
#[cfg(feature = "processing")]
Expand All @@ -217,7 +212,6 @@ pub struct EnvelopeManager {
impl EnvelopeManager {
pub fn create(
config: Arc<Config>,
processor: Addr<EnvelopeProcessor>,
buffer_guard: Arc<BufferGuard>,
) -> Result<Self, ServerError> {
// Enter the tokio runtime so we can start spawning tasks from the outside.
Expand All @@ -243,7 +237,6 @@ impl EnvelopeManager {
config,
buffer_guard,
captures: BTreeMap::new(),
processor,
#[cfg(feature = "processing")]
store_forwarder,
#[cfg(feature = "processing")]
Expand Down Expand Up @@ -316,7 +309,7 @@ impl EnvelopeManager {
if let HttpEncoding::Identity = request.http_encoding {
UpstreamRelay::from_registry().do_send(SendRequest(request));
} else {
self.processor.do_send(EncodeEnvelope::new(request));
EnvelopeProcessor::from_registry().do_send(EncodeEnvelope::new(request));
}

Box::new(
Expand Down Expand Up @@ -385,7 +378,7 @@ impl Message for QueueEnvelope {
impl Handler<QueueEnvelope> for EnvelopeManager {
type Result = Result<Option<EventId>, QueueEnvelopeError>;

fn handle(&mut self, message: QueueEnvelope, context: &mut Self::Context) -> Self::Result {
fn handle(&mut self, message: QueueEnvelope, _: &mut Self::Context) -> Self::Result {
let QueueEnvelope {
mut envelope,
mut envelope_context,
Expand All @@ -404,7 +397,7 @@ impl Handler<QueueEnvelope> for EnvelopeManager {

if !metric_items.is_empty() {
relay_log::trace!("sending metrics into processing queue");
self.processor.do_send(ProcessMetrics {
EnvelopeProcessor::from_registry().do_send(ProcessMetrics {
items: metric_items,
project_key,
start_time,
Expand All @@ -424,11 +417,11 @@ impl Handler<QueueEnvelope> for EnvelopeManager {
// Update the old context after successful forking.
envelope_context.update(&envelope);

context.notify(HandleEnvelope {
envelope: event_envelope,
envelope_context: event_context,
ProjectCache::from_registry().do_send(ValidateEnvelope::new(
project_key,
});
event_envelope,
event_context,
));
}

if envelope.is_empty() {
Expand All @@ -437,11 +430,11 @@ impl Handler<QueueEnvelope> for EnvelopeManager {
envelope_context.accept();
} else {
relay_log::trace!("queueing envelope");
context.notify(HandleEnvelope {
ProjectCache::from_registry().do_send(ValidateEnvelope::new(
project_key,
envelope,
envelope_context,
project_key,
});
));
}

// Actual event handling is performed asynchronously in a separate future. The lifetime of
Expand All @@ -452,6 +445,7 @@ impl Handler<QueueEnvelope> for EnvelopeManager {
}
}

/*
/// Handles a queued envelope.
///
/// 1. Ensures the project state is up-to-date and then validates the envelope against the state and
Expand Down Expand Up @@ -495,156 +489,67 @@ impl Handler<HandleEnvelope> for EnvelopeManager {
// to being sent to the upstream (including delays in the upstream). This can be regarded
// the total time an envelope spent in this Relay, corrected by incoming network delays.
let processor = self.processor.clone();
// TODO(ja): Implement drop guards for all spawned futures
.drop_guard("process_envelope");
let HandleEnvelope {
Box::new(future)
}
}
*/

/// TODO(ja): Doc
pub struct SubmitEnvelope {
pub envelope: Envelope,
pub envelope_context: EnvelopeContext,
}

impl Message for SubmitEnvelope {
type Result = ();
}

impl Handler<SubmitEnvelope> for EnvelopeManager {
type Result = ();

fn handle(&mut self, message: SubmitEnvelope, context: &mut Self::Context) -> Self::Result {
let SubmitEnvelope {
envelope,
envelope_context,
project_key,
mut envelope_context,
} = message;

let scoping = envelope_context.scoping();
let start_time = envelope.meta().start_time();
let event_id = envelope.event_id();
let project_key = envelope.meta().public_key();

let future = ProjectCache::from_registry()
.send(CheckEnvelope::fetched(
project_key,
envelope,
envelope_context,
))
.map_err(|_| ProcessingError::ScheduleFailed)
.and_then(|result| result.map_err(ProcessingError::ProjectFailed))
.and_then(move |response| {
// Use the project id from the loaded project state to account for redirects.
let project_id = response.scoping.project_id.value();
metric!(set(RelaySets::UniqueProjects) = project_id as i64);

let checked = response.result.map_err(ProcessingError::Rejected)?;
checked.envelope.ok_or(ProcessingError::RateLimited)
})
.and_then(move |(envelope, envelope_context)| {
// get the state for the current project. we can always fetch the cached version
// even if the no_cache flag was passed, as the cache was updated prior in
// `CheckEnvelope`.
ProjectCache::from_registry()
.send(GetProjectState::new(project_key))
.map_err(|_| ProcessingError::ScheduleFailed)
.and_then(|result| result.map_err(ProcessingError::ProjectFailed))
.map(|state| (envelope, envelope_context, state))
})
.and_then(|(envelope, envelope_context, project_state)| {
// get the state for the sampling project.
// TODO: Could this run concurrently with main project cache fetch?
if let Some(sampling_project_key) = utils::get_sampling_key(&envelope) {
let future = ProjectCache::from_registry()
.send(GetProjectState::new(sampling_project_key))
.then(move |response| {
Ok(ProcessEnvelope {
envelope,
envelope_context,
project_state,
// ignore all errors and leave envelope unsampled
sampling_project_state: response.ok().and_then(|r| r.ok()),
})
});

Box::new(future) as ResponseFuture<_, _>
} else {
Box::new(future::ok(ProcessEnvelope {
envelope,
envelope_context,
project_state,
sampling_project_state: None,
}))
self.send_envelope(project_key, envelope, scoping, start_time, context)
.then(move |result| match result {
Ok(_) => {
envelope_context.accept();
Ok(())
}
})
.and_then(move |process_message| {
processor
.send(process_message)
.map_err(|_| ProcessingError::ScheduleFailed)
.flatten()
})
.and_then(move |processed| {
// Processing returned new rate limits. Cache them on the project to avoid expensive
// processing while the limit is active.
let rate_limits = processed.rate_limits;
if rate_limits.is_limited() {
let project_cache = ProjectCache::from_registry();
project_cache.do_send(UpdateRateLimits::new(project_key, rate_limits));
}

processed.envelope.ok_or(ProcessingError::RateLimited)
})
.into_actor(self)
.and_then(move |(envelope, mut envelope_context), slf, ctx| {
let scoping = envelope_context.scoping();
slf.send_envelope(project_key, envelope, scoping, start_time, ctx)
.then(move |result| match result {
Ok(_) => {
envelope_context.accept();
Ok(())
}
Err(error) => {
let outcome = Outcome::Invalid(DiscardReason::Internal);

Err(match error {
#[cfg(feature = "processing")]
SendEnvelopeError::ScheduleFailed => {
envelope_context.reject(outcome);
ProcessingError::ScheduleFailed
}

#[cfg(feature = "processing")]
SendEnvelopeError::StoreFailed(e) => {
envelope_context.reject(outcome);
ProcessingError::StoreFailed(e)
}

SendEnvelopeError::BodyEncodingFailed(e) => {
envelope_context.reject(outcome);
ProcessingError::BodyEncodingFailed(e)
}

SendEnvelopeError::EnvelopeBuildFailed(e) => {
envelope_context.reject(outcome);
ProcessingError::EnvelopeBuildFailed(e)
}

SendEnvelopeError::UpstreamRequestFailed(e) => {
if e.is_received() {
envelope_context.accept();
} else {
envelope_context.reject(outcome);
}

ProcessingError::UpstreamRequestFailed(e)
}
})
Err(error) => {
match error {
SendEnvelopeError::UpstreamRequestFailed(e) if e.is_received() => {
envelope_context.accept()
}
})
.into_actor(slf)
})
.map_err(move |error, slf, context| {
if Capture::should_capture(&slf.config) {
context.notify(Capture::rejected(event_id, &error));
}
_ => envelope_context.reject(Outcome::Invalid(DiscardReason::Internal)),
}

// TODO(ja): Error handling?
// // Errors are only logged for what we consider an internal discard reason. These
// // indicate errors in the infrastructure or implementation bugs. In other cases,
// // we "expect" errors and log them as debug level.
// relay_log::with_scope(
// |scope| scope.set_tag("project_key", project_key),
// || relay_log::error!("error sending envelope: {}", LogError(&error)),
// );

let outcome = error.to_outcome();
if let Some(Outcome::Invalid(DiscardReason::Internal)) = outcome {
// Errors are only logged for what we consider an internal discard reason. These
// indicate errors in the infrastructure or implementation bugs. In other cases,
// we "expect" errors and log them as debug level.
relay_log::with_scope(
|scope| scope.set_tag("project_key", project_key),
|| relay_log::error!("error processing envelope: {}", LogError(&error)),
);
} else {
relay_log::debug!("dropped envelope: {}", LogError(&error));
Err(())
}
})
.drop_guard("process_envelope");

Box::new(future)
// TODO(ja): Capture errors
// TODO(ja): protect futures --> common util?
.into_actor(self)
.spawn(context);
}
}

Expand Down
Loading

0 comments on commit 11d2d7f

Please sign in to comment.