Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(server): Simplify CheckEnvelope #1429

Merged
merged 1 commit into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 9 additions & 30 deletions relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ use relay_statsd::metric;
use crate::actors::outcome::{DiscardReason, Outcome};
use crate::actors::processor::{EnvelopeProcessor, ProcessEnvelope};
use crate::actors::project_cache::{
AddSamplingState, CheckEnvelopeResponse, CheckedEnvelope, ProjectCache, ProjectError,
UpdateProjectState,
AddSamplingState, CheckedEnvelope, ProjectCache, ProjectError, UpdateProjectState,
};
use crate::envelope::Envelope;
use crate::extractors::RequestMeta;
Expand Down Expand Up @@ -699,7 +698,7 @@ impl Project {
envelope_context: EnvelopeContext,
project_state: Arc<ProjectState>,
) {
if let Ok(checked) = self.check_envelope(envelope, envelope_context).result {
if let Ok(checked) = self.check_envelope(envelope, envelope_context) {
if let Some((envelope, envelope_context)) = checked.envelope {
let process = ProcessEnvelope {
envelope,
Expand Down Expand Up @@ -817,25 +816,18 @@ impl Project {
})
}

/// Amends request `Scoping` with information from this project state.
///
/// If the project state is loaded, information from the project state is merged into the
/// request's scoping. Otherwise, this function returns partial scoping from the `request_meta`.
/// See [`RequestMeta::get_partial_scoping`] for more information.
fn scope_request(&self, meta: &RequestMeta) -> Scoping {
match self.valid_state() {
Some(state) => state.scope_request(meta),
None => meta.get_partial_scoping(),
}
}

fn check_envelope_scoped(
pub fn check_envelope(
&mut self,
mut envelope: Envelope,
mut envelope_context: EnvelopeContext,
) -> Result<CheckedEnvelope, DiscardReason> {
let state = self.valid_state();
if let Some(state) = state.as_deref() {
let mut scoping = envelope_context.scoping();

if let Some(ref state) = state {
scoping = state.scope_request(envelope.meta());
envelope_context.scope(scoping);

if let Err(reason) = state.check_request(envelope.meta(), &self.config) {
envelope_context.reject(Outcome::Invalid(reason));
return Err(reason);
Expand All @@ -849,7 +841,6 @@ impl Project {
Ok(self.rate_limits.check_with_quotas(quotas, item_scoping))
});

let scoping = envelope_context.scoping();
let (enforcement, rate_limits) = envelope_limiter.enforce(&mut envelope, &scoping)?;
enforcement.track_outcomes(&envelope, &scoping);
envelope_context.update(&envelope);
Expand All @@ -867,18 +858,6 @@ impl Project {
rate_limits,
})
}

pub fn check_envelope(
&mut self,
envelope: Envelope,
mut envelope_context: EnvelopeContext,
) -> CheckEnvelopeResponse {
let scoping = self.scope_request(envelope.meta());
envelope_context.scope(scoping);

let result = self.check_envelope_scoped(envelope, envelope_context);
CheckEnvelopeResponse { result, scoping }
}
}

impl Drop for Project {
Expand Down
15 changes: 4 additions & 11 deletions relay-server/src/actors/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures01::{future, Future};
use relay_common::ProjectKey;
use relay_config::{Config, RelayMode};
use relay_metrics::{self, AggregateMetricsError, Bucket, FlushBuckets, Metric};
use relay_quotas::{RateLimits, Scoping};
use relay_quotas::RateLimits;
use relay_redis::RedisPool;
use relay_statsd::metric;

Expand Down Expand Up @@ -434,19 +434,12 @@ pub struct CheckedEnvelope {
pub rate_limits: RateLimits,
}

/// Scoping information along with a checked envelope.
#[derive(Debug)]
pub struct CheckEnvelopeResponse {
pub result: Result<CheckedEnvelope, DiscardReason>,
pub scoping: Scoping,
}

impl Message for CheckEnvelope {
type Result = Result<CheckEnvelopeResponse, ProjectError>;
type Result = Result<CheckedEnvelope, DiscardReason>;
}

impl Handler<CheckEnvelope> for ProjectCache {
type Result = Result<CheckEnvelopeResponse, ProjectError>;
type Result = Result<CheckedEnvelope, DiscardReason>;

fn handle(&mut self, message: CheckEnvelope, _: &mut Self::Context) -> Self::Result {
let project = self.get_or_create_project(message.project_key);
Expand All @@ -456,7 +449,7 @@ impl Handler<CheckEnvelope> for ProjectCache {
// a full reload. Fetching must not block the store request.
project.get_or_fetch_state(false);

Ok(project.check_envelope(message.envelope, message.context))
project.check_envelope(message.envelope, message.context)
}
}

Expand Down
17 changes: 2 additions & 15 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use relay_statsd::metric;

use crate::actors::envelopes::{EnvelopeManager, QueueEnvelope, QueueEnvelopeError};
use crate::actors::outcome::{DiscardReason, Outcome};
use crate::actors::project_cache::{CheckEnvelope, ProjectCache, ProjectError};
use crate::actors::project_cache::{CheckEnvelope, ProjectCache};
use crate::envelope::{AttachmentType, Envelope, EnvelopeError, ItemType, Items};
use crate::extractors::RequestMeta;
use crate::service::{ServiceApp, ServiceState};
Expand All @@ -35,9 +35,6 @@ pub enum BadStoreRequest {
#[fail(display = "could not schedule event processing")]
ScheduleFailed,

#[fail(display = "failed to fetch project information")]
ProjectFailed(#[cause] ProjectError),

#[fail(display = "empty request body")]
EmptyBody,

Expand Down Expand Up @@ -101,15 +98,6 @@ impl ResponseError for BadStoreRequest {
.header(utils::RATE_LIMITS_HEADER, rate_limits_header)
.json(&body)
}
BadStoreRequest::ProjectFailed(project_error) => match project_error {
ProjectError::FetchFailed => {
// This particular project is somehow broken. We could treat this as 503 but it's
// more likely that the error is local to this project.
HttpResponse::InternalServerError().json(&body)
}
ProjectError::ScheduleFailed => HttpResponse::ServiceUnavailable().json(&body),
},

BadStoreRequest::ScheduleFailed | BadStoreRequest::QueueFailed(_) => {
// These errors indicate that something's wrong with our actor system, most likely
// mailbox congestion or a faulty shutdown. Indicate an unavailable service to the
Expand Down Expand Up @@ -328,11 +316,10 @@ where
ProjectCache::from_registry()
.send(CheckEnvelope::new(project_key, envelope, envelope_context))
.map_err(|_| BadStoreRequest::ScheduleFailed)
.and_then(|result| result.map_err(BadStoreRequest::ProjectFailed))
})
.and_then(clone!(config, |response| {
// Skip over queuing and issue a rate limit right away
let checked = response.result.map_err(BadStoreRequest::EventRejected)?;
let checked = response.map_err(BadStoreRequest::EventRejected)?;
let (envelope, mut envelope_context) = match checked.envelope {
Some(tuple) => tuple,
None => return Err(BadStoreRequest::RateLimited(checked.rate_limits)),
Expand Down