-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ref(server): Localize outcome emission in EnvelopeContext #1406
Conversation
/// | ||
/// This does not send outcomes for empty envelopes or request-only contexts. | ||
pub fn send_outcomes(&self, outcome: Outcome) { | ||
pub fn reject(&mut self, outcome: Outcome) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the reason this takes &mut self
instead of mut self
? I assume this is also the reason you end up with needing the a bit awkward self.done
flag? It's a bit unfortunate as it breaks the RAII convention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, there are several places in the code base where we need to pass the envelope context through callbacks such as map_err()
or hold it on a struct such as ProcessEnvelopeState
. Right now, having this &mut
is still more ergonomic and allows to keep most of that code the same for this PR.
There will be a follow-up PR that refactors these places as we'll start to tie envelopes and the context completely together. Hopefully, when we're done with the transition, this can consume self
again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @flub here, but with the if self.done
in place, I think we can leave it until the next refactor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some pointers for reviewers. A lot of this information will go in the PR description, but please let me know if you would prefer some of these notes in code comments.
// The envelope has been split, so we need to fork the context. | ||
envelope_context.update(&envelope); | ||
let event_context = EnvelopeContext::from_envelope(&event_envelope); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This construct is a bit ugly. From one envelope we create two, and thus we also need to split the EnvelopeContext
. This is achieved by creating a new one for the split Envelope, and updating the old one with remaining items.
I'm thinking that we might want to lift the split
method -- an odd function to begin with -- to the EnvelopeContext and maintain all modification to the envelope there. I haven't found a nice way to do that just yet, so I'm leaving this for a follow-up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, envelope_context.split_by
would be nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree that moving split
into the context here is probably the right way, but fine as a followup.
if envelope.is_empty() { | ||
// The envelope can be empty here if it contained only metrics items which were removed | ||
// above. In this case, the envelope was accepted and needs no further queueing. | ||
envelope_context.accept(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This accept is actually correct. We only call this message if the envelope was non-empty. If the envelope is now empty, it means that all the items have been handled individually, and the envelope is done.
This mostly matters because accept()
internally logs some success metrics.
The only case that can trigger this at the moment is a metrics-only envelope.
} = message; | ||
|
||
let sampling_project_key = utils::get_sampling_key(&envelope); | ||
|
||
let start_time = envelope.meta().start_time(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clean up the messages a bit already, the start time has been removed from HandleEnvelope
and can instead be pulled from the Envelope's meta. This is is also where handle_store_like_request
obtains it from.
CheckEnvelope::fetched(project_key, envelope), | ||
*envelope_context.clone().borrow(), | ||
) | ||
.send(CheckEnvelope::fetched( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
send_tracked
is gone completely. If the envelope and its context is dropped for any reason, we log an "internal"
outcome automatically, now. This will also cover more cases that were not instrumented yet.
Same applies to a couple of map_err
calls below that intercepted project errors and mapped them to "internal"
.
|
||
match checked.envelope { | ||
Some(envelope) => { | ||
envelope_context.update(&envelope); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updating scoping and the context is now handled inside the CheckEnvelope
and ProcessEnvelope
messages. The only thing that remains in the EnvelopeManager
is to propagate errors for control flow.
}, | ||
}) | ||
.unwrap(); | ||
let new_envelope = relay_test::with_system(move || { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests are largely unaltered and simply updated to match the new signatures. Only exception is this test, which now requires an actix system since it will log outcomes internally. The outcomes do not matter for this test.
@@ -344,85 +299,65 @@ where | |||
let project_key = meta.public_key(); | |||
let start_time = meta.start_time(); | |||
let config = request.state().config(); | |||
|
|||
let envelope_context = Rc::new(RefCell::new(EnvelopeContext::from_request(&meta))); | |||
let event_id = Rc::new(RefCell::new(None)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The EnvelopeContext
is now moved through the chain of futures rather than shared across all the callbacks. However, we still need the EventId in the global error handler, so we can't get rid of the RefCell
just yet.
if envelope.is_empty() { | ||
// envelope is empty, cannot send outcomes | ||
envelope_context.reject(Outcome::Invalid(DiscardReason::EmptyEnvelope)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the rejections have been pushed deep into message handlers, this one being a notable exception. Since the envelope is actually dropped here, we also need to reject and drop the context.
QueueEnvelopeError::TooManyEnvelopes => Outcome::Invalid(DiscardReason::Internal), | ||
}, | ||
BadStoreRequest::ProjectFailed(project_error) => match project_error { | ||
ProjectError::FetchFailed => Outcome::Invalid(DiscardReason::ProjectState), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interestingly, this was mapped to DiscardReason::ProjectState
while it was mapped to DiscardReason::Internal
in the envelope manager's future. There's little point in differentiating between those failure modes in outcomes, so I went with "internal" just like everywhere else, allowing to simplify logic.
/// Resets inner state to ensure there's no more logging. | ||
fn finish(&mut self, counter: RelayCounters) { | ||
relay_statsd::metric!(counter(counter) += 1); | ||
relay_statsd::metric!(timer(RelayTimers::EnvelopeTotalTime) = self.start_time.elapsed()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that these metrics are now also logged for envelopes that are rejected in the endpoint (handle_store_like_request
). This is actually more truthful.
// The envelope has been split, so we need to fork the context. | ||
envelope_context.update(&envelope); | ||
let event_context = EnvelopeContext::from_envelope(&event_envelope); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, envelope_context.split_by
would be nice.
state.envelope_context.update(&state.envelope); | ||
|
||
let envelope_response = if state.envelope.is_empty() { | ||
// Individual rate limits have already been issued | ||
state.envelope_context.reject(Outcome::RateLimited(None)); | ||
None | ||
} else { | ||
Some((state.envelope, state.envelope_context)) | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: If the intention is to reject without emitting an outcome, should we introduce an explicit argument / method for that? E.g.
.reject(None)
// or
.reject_empty()
enforcement.track_outcomes(&envelope, scoping); | ||
let scoping = envelope_context.scoping(); | ||
let (enforcement, rate_limits) = envelope_limiter.enforce(&mut envelope, &scoping)?; | ||
enforcement.track_outcomes(&envelope, &scoping); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we move this to reject
as well at some point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, or providing a safer reject_item
method that removes an item, logs the outcome, and updates the context in the same go. That could then be used by the rate limiter.
/// | ||
/// This does not send outcomes for empty envelopes or request-only contexts. | ||
pub fn send_outcomes(&self, outcome: Outcome) { | ||
pub fn reject(&mut self, outcome: Outcome) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @flub here, but with the if self.done
in place, I think we can leave it until the next refactor.
/// | ||
/// This envelope context should be updated using [`update`](Self::update) soon after this | ||
/// operation to ensure that subsequent outcomes are consistent. | ||
pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could get rid of the pub
here? As far as I can see process_profiles
is the only caller, I guess we need it there because we emit outcomes for individual profiles?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will actually have to call this in more places, at least:
- For sessions. Right now, we're dropping sessions without logging outcomes.
- For the rate limiter once
enforcement.track_outcomes
is migrated to the context.
This is definitely a somewhat unsafe API in the interim.
// The envelope has been split, so we need to fork the context. | ||
envelope_context.update(&envelope); | ||
let event_context = EnvelopeContext::from_envelope(&event_envelope); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree that moving split
into the context here is probably the right way, but fine as a followup.
let outcome = Outcome::Invalid(DiscardReason::Internal); | ||
|
||
match error { | ||
Err(match error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
totally a nitpicking: but I dislike round brackets spanning this many lines, I'd always assign this and then put Err()
around the variable on a next line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. This entire match statement should probably move to a function, or could be simplified by changing the error type to contain the SendError.
Since this is going to move soon anyway, I'll play with the best version of this in the follow-up.
/// Returns the instant at which the envelope was received at this Relay. | ||
/// | ||
/// This is the monotonic time equivalent to [`received_at`](Self::received_at). | ||
pub fn start_time(&self) -> Instant { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naming is hard, but maybe hungarian naming is defensible in this case: .received_instant()
? currently these two names are just so far apart.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. So far the entire code base consistently calls them start_time
and received_at
because of historical context. If it's OK, I'd do a sweep in a dedicated PR for this.
I think this one will have the most impact |
This change makes
EnvelopeContext
responsible for tracking the lifetime of an envelope and emitting outcomes. In a follow-up, the goal is to remove the long-running handler future inEnvelopeManager
, and instead pass envelopes through separate queues the ingestion stages. Most importantly, this requires to have a central place that is bound to the envelope and emits outcomes and metrics when envelopes are dropped for any reason.EnvelopeContext
Before this change,
EnvelopeContext
was a data object containing all information to create outcomes. It is primarily used in three places:The envelope context is now passed alongside the envelope through Relay and also moves through message handlers. This can ensure that outcomes are guaranteed when the envelope is dropped. It can be dropped in two ways:
accept()
orreject(outcome)
."internal"
outcome.Calling
accept()
consumes the envelope context. Temporarily,reject()
does not yet consume, which will be changed in a follow-up.Control Flow
The flow of envelopes remains unchanged, with the envelope context now following the envelope through every step:
(in the endpoint)
(in envelope manager)
Follow-Ups
active_envelopes
) in theEnvelopeManager
.#skip-changelog