Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(server): Prepare for per-item rate limits #615

Merged
merged 1 commit into from
Jun 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions relay-server/src/actors/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ use relay_redis::RedisPool;

use crate::actors::outcome::{DiscardReason, Outcome, OutcomeProducer, TrackOutcome};
use crate::actors::project::{
EventAction, GetEventAction, GetProjectState, GetScoping, Project, ProjectState,
UpdateRateLimits,
CheckEnvelope, GetProjectState, Project, ProjectState, UpdateRateLimits,
};
use crate::actors::project_cache::ProjectError;
use crate::actors::upstream::{SendRequest, UpstreamRelay, UpstreamRequestError};
Expand Down Expand Up @@ -1046,7 +1045,6 @@ impl Handler<HandleEnvelope> for EventManager {
let event_id = envelope.event_id();
let project_id = envelope.meta().project_id();
let remote_addr = envelope.meta().client_addr();
let shared_meta = Arc::new(envelope.meta().clone());

// Compute whether this envelope contains an event. This is used in error handling to
// appropriately emit an outecome. Envelopes not containing events (such as standalone
Expand All @@ -1058,27 +1056,26 @@ impl Handler<HandleEnvelope> for EventManager {
metric!(set(RelaySets::UniqueProjects) = project_id.value() as i64);

let future = project
.send(GetScoping::fetched(shared_meta.clone()))
.send(CheckEnvelope::fetched(envelope))
.map_err(ProcessingError::ScheduleFailed)
.and_then(|scoping_result| scoping_result.map_err(ProcessingError::ProjectFailed))
.and_then(clone!(project, scoping, |new_scoping| {
scoping.replace(new_scoping);
project
.send(GetEventAction::new(shared_meta))
.map_err(ProcessingError::ScheduleFailed)
.and_then(|result| result.map_err(ProcessingError::ProjectFailed))
.and_then(clone!(scoping, |response| {
scoping.replace(response.scoping);

let checked = response.result.map_err(ProcessingError::EventRejected)?;
match checked.envelope {
Some(envelope) => Ok(envelope),
None => Err(ProcessingError::RateLimited(checked.rate_limits)),
}
}))
.and_then(|action| match action {
EventAction::Accept => Ok(()),
EventAction::RateLimit(limits) => Err(ProcessingError::RateLimited(limits)),
EventAction::Discard(reason) => Err(ProcessingError::EventRejected(reason)),
})
.and_then(clone!(project, |_| {
.and_then(clone!(project, |envelope| {
project
.send(GetProjectState)
.map_err(ProcessingError::ScheduleFailed)
.and_then(|result| result.map_err(ProcessingError::ProjectFailed))
.map(|state| (envelope, state))
}))
.and_then(move |project_state| {
.and_then(move |(envelope, project_state)| {
processor
.send(ProcessEnvelope {
envelope,
Expand Down
180 changes: 107 additions & 73 deletions relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};

use crate::actors::outcome::DiscardReason;
use crate::actors::project_cache::{FetchProjectState, ProjectCache, ProjectError};
use crate::envelope::Envelope;
use crate::extractors::RequestMeta;
use crate::metrics::RelayCounters;
use crate::utils::Response;
use crate::utils::{ActorResponse, Response};

/// The current status of a project state. Return value of `ProjectState::outdated`.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -294,11 +295,18 @@ impl ProjectState {
scoping
}

/// Determines whether the given event should be accepted or dropped.
pub fn get_event_action(&self, meta: &RequestMeta, config: &Config) -> EventAction {
/// Determines whether the given request should be accepted or discarded.
///
/// Returns `Ok(())` if the request should be accepted. Returns `Err(DiscardReason)` if the
/// request should be discarded, by indicating the reason. The checks preformed for this are:
///
/// - Allowed origin headers
/// - Disabled or unknown projects
/// - Disabled project keys (DSN)
pub fn check_request(&self, meta: &RequestMeta, config: &Config) -> Result<(), DiscardReason> {
// Try to verify the request origin with the project config.
if !self.is_valid_origin(meta.origin()) {
return EventAction::Discard(DiscardReason::Cors);
return Err(DiscardReason::Cors);
}

if self.outdated(config) == Outdated::HardOutdated {
Expand All @@ -310,32 +318,32 @@ impl ProjectState {
// except queueing events for unknown DSNs as they might have become
// available in the meanwhile.
match self.get_public_key_status(meta.public_key()) {
PublicKeyStatus::Enabled => EventAction::Accept,
PublicKeyStatus::Disabled => EventAction::Discard(DiscardReason::ProjectId),
PublicKeyStatus::Unknown => EventAction::Accept,
PublicKeyStatus::Enabled => Ok(()),
PublicKeyStatus::Disabled => Err(DiscardReason::ProjectId),
PublicKeyStatus::Unknown => Ok(()),
}
} else {
// if we recorded an invalid project state response from the upstream (i.e. parsing
// failed), discard the event with a s
if self.invalid() {
return EventAction::Discard(DiscardReason::ProjectState);
return Err(DiscardReason::ProjectState);
}

// only drop events if we know for sure the project is disabled.
if self.disabled() {
return EventAction::Discard(DiscardReason::ProjectId);
return Err(DiscardReason::ProjectId);
}

// since the config has been fetched recently, we assume unknown
// public keys do not exist and drop events eagerly. proxy mode is
// an exception, where public keys are backfilled lazily after
// events are sent to the upstream.
match self.get_public_key_status(meta.public_key()) {
PublicKeyStatus::Enabled => EventAction::Accept,
PublicKeyStatus::Disabled => EventAction::Discard(DiscardReason::ProjectId),
PublicKeyStatus::Enabled => Ok(()),
PublicKeyStatus::Disabled => Err(DiscardReason::ProjectId),
PublicKeyStatus::Unknown => match config.relay_mode() {
RelayMode::Proxy => EventAction::Accept,
_ => EventAction::Discard(DiscardReason::ProjectId),
RelayMode::Proxy => Ok(()),
_ => Err(DiscardReason::ProjectId),
},
}
}
Expand Down Expand Up @@ -510,6 +518,37 @@ impl Project {
None => meta.get_partial_scoping(),
}
}

fn check_envelope(
&mut self,
envelope: Envelope,
scoping: &Scoping,
) -> Result<CheckedEnvelope, DiscardReason> {
if let Some(state) = self.state() {
state.check_request(envelope.meta(), &self.config)?;
}

self.rate_limits.clean_expired();

// TODO: Apply rate limits no non-event items once implemented.
let rate_limits = self.rate_limits.check(scoping.item(DataCategory::Error));
let envelope = if rate_limits.is_limited() {
None
} else {
Some(envelope)
};

Ok(CheckedEnvelope {
envelope,
rate_limits,
})
}

fn check_envelope_scoped(&mut self, message: CheckEnvelope) -> CheckEnvelopeResponse {
let scoping = self.get_scoping(message.envelope.meta());
let result = self.check_envelope(message.envelope, &scoping);
CheckEnvelopeResponse { result, scoping }
}
}

impl Actor for Project {
Expand All @@ -524,6 +563,9 @@ impl Actor for Project {
}
}

/// Returns the project state.
///
/// The project state is fetched if it is missing or outdated.
pub struct GetProjectState;

impl Message for GetProjectState {
Expand All @@ -538,89 +580,81 @@ impl Handler<GetProjectState> for Project {
}
}

pub struct GetScoping {
meta: Arc<RequestMeta>,
/// Checks the envelope against project configuration and rate limits.
///
/// When `fetched`, then the project state is ensured to be up to date. When `cached`, an outdated
/// project state may be used, or otherwise the envelope is passed through unaltered.
///
/// To check the envelope, this runs:
/// - Validate origins and public keys
/// - Quotas with a limit of `0`
/// - Cached rate limits
#[derive(Debug)]
pub struct CheckEnvelope {
envelope: Envelope,
fetch: bool,
}

impl GetScoping {
pub fn fetched(meta: Arc<RequestMeta>) -> Self {
Self { meta, fetch: true }
impl CheckEnvelope {
/// Fetches the project state and checks the envelope.
pub fn fetched(envelope: Envelope) -> Self {
Self {
envelope,
fetch: true,
}
}

pub fn cached(meta: Arc<RequestMeta>) -> Self {
Self { meta, fetch: false }
/// Uses a cached project state and checks the envelope.
pub fn cached(envelope: Envelope) -> Self {
Self {
envelope,
fetch: false,
}
}
}

impl Message for GetScoping {
type Result = Result<Scoping, ProjectError>;
/// A checked envelope and associated rate limits.
///
/// Items violating the rate limits have been removed from the envelope. If all items are removed
/// from the envelope, `None` is returned in place of the envelope.
#[derive(Debug)]
pub struct CheckedEnvelope {
pub envelope: Option<Envelope>,
pub rate_limits: RateLimits,
}

/// Scoping information along with a checked envelope.
#[derive(Debug)]
pub struct CheckEnvelopeResponse {
pub result: Result<CheckedEnvelope, DiscardReason>,
pub scoping: Scoping,
}

impl Message for CheckEnvelope {
type Result = Result<CheckEnvelopeResponse, ProjectError>;
}

impl Handler<GetScoping> for Project {
type Result = Response<Scoping, ProjectError>;
impl Handler<CheckEnvelope> for Project {
type Result = ActorResponse<Self, CheckEnvelopeResponse, ProjectError>;

fn handle(&mut self, message: GetScoping, context: &mut Self::Context) -> Self::Result {
fn handle(&mut self, message: CheckEnvelope, context: &mut Self::Context) -> Self::Result {
if message.fetch {
// Project state fetching is allowed, so ensure the state is fetched and up-to-date.
// This will return synchronously if the state is still cached.
self.get_or_fetch_state(context)
.map(move |state| state.get_scoping(&message.meta))
.into_actor()
.map(self, context, move |_, slf, _ctx| {
slf.check_envelope_scoped(message)
})
} else {
self.get_or_fetch_state(context);
// message.fetch == false: Fetching must not block the store request. The EventManager
// will later fetch the project state.
Response::ok(self.get_scoping(&message.meta))
ActorResponse::ok(self.check_envelope_scoped(message))
}
}
}

pub struct GetEventAction {
meta: Arc<RequestMeta>,
}

impl GetEventAction {
pub fn new(meta: Arc<RequestMeta>) -> Self {
GetEventAction { meta }
}
}

/// Indicates what should happen to events based on their meta data.
#[derive(Clone, Debug)]
pub enum EventAction {
/// The event should be discarded.
Discard(DiscardReason),
/// The event should be discarded and the client should back off for some time.
RateLimit(RateLimits),
/// The event should be processed and sent to upstream.
Accept,
}

impl Message for GetEventAction {
type Result = EventAction;
}

impl Handler<GetEventAction> for Project {
type Result = MessageResult<GetEventAction>;

fn handle(&mut self, message: GetEventAction, _context: &mut Self::Context) -> Self::Result {
let scoping = self.get_scoping(&message.meta);
let rate_limits = self.rate_limits.check(scoping.item(DataCategory::Error));

let event_action = if rate_limits.is_limited() {
EventAction::RateLimit(rate_limits)
} else {
// If the state is not loaded, we're probably in a preflight request. `EventManager`
// ensures to load the state before calling this function.
self.state().map_or(EventAction::Accept, |state| {
state.get_event_action(&message.meta, &self.config)
})
};

MessageResult(event_action)
}
}

pub struct UpdateRateLimits(pub RateLimits);

impl Message for UpdateRateLimits {
Expand Down
Loading