-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sampling): Reservoir sampling #2550
Conversation
let sampling_base_value = match self.sampling_value { | ||
SamplingValue::SampleRate { value } => value, | ||
SamplingValue::Factor { value } => value, | ||
SamplingValue::Reservoir { limit } => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ugly: we return SamplingValue::Reservoir{limit}, even though the limit no longer has any meaning passed this point. It was the easiest solution, will refactor in the future.
} | ||
|
||
impl SamplingValue { | ||
pub(crate) fn value(&self) -> f64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
function no longer makes sense because reservoir limit doesn't have an analogous value to these
} | ||
|
||
/// Returns the updated [`SamplingValue`] if it's valid. | ||
pub fn evaluate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed the sample_rate
function.
relay-sampling/src/evaluation.rs
Outdated
#[cfg(feature = "redis")] | ||
redis_pool: Option<Arc<RedisPool>>, | ||
#[cfg(feature = "redis")] | ||
org_id: Option<u64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally this would be one option, since we only care about having both or none. But a new type is overkill and a tuple might be messy.
We always have access to the org id but I opted for putting it behind an option rather than initializing with an invalid value, even though it wouldn't matter for the code execution
@@ -93,6 +207,9 @@ impl SamplingEvaluator { | |||
self.rule_ids, | |||
)); | |||
} | |||
SamplingValue::Reservoir { .. } => { | |||
return ControlFlow::Break(SamplingMatch::new(1.0, seed, vec![rule.id])); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we only take the last rule id in the reservoir case because the reservoir matching overrides all previously matched rules.
@@ -565,6 +573,8 @@ impl EnvelopeProcessorService { | |||
}); | |||
|
|||
let inner = InnerProcessor { | |||
#[cfg(feature = "processing")] | |||
redis_pool: _redis.clone().map(Arc::new), | |||
#[cfg(feature = "processing")] | |||
rate_limiter: _redis | |||
.map(|pool| RedisRateLimiter::new(pool).max_limit(config.max_rate_limit())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thoughts on sending the RedisRateLimiter the same arc as in the redis_pool field in a follow-up pr?
return; | ||
}; | ||
|
||
if let Ok(mut guard) = self.reservoir_counters.try_lock() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using try_lock
comes with the downside of skipping cleanup in case the rules are being used right now. It is an unlikely case, so it's a fair assumption, but please leave a code comment of why this is so crucial to prevent that someone changes this into a full lock()
in the future.
The only way to lock this here is using a tokio Mutex, but before we do that we should explore other approaches.
relay-sampling/src/evaluation.rs
Outdated
if redis_sampling::set_redis_expiry(&mut redis_connection, &key, rule_expiry).is_err() { | ||
relay_log::error!("failed to set redis reservoir rule expiry"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jan-auer I wasn't sure what we would do if this returned an error, so I opted for just setting the expiry every time as we mentioned, then we could think about optimizing later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could log and ignore the error on EXPIRE like you do here, and handle the increment error gracefully. The more correct version, however, would be: Encapsulate the two calls into a single function that throws an error, and if the error occurs simply do not match the rule.
That would mean if Redis is unavailable we skip all reservoir rules and apply other matching rules. A graceful and predictable fallback behavior that does not cause excess indexing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, theres an edge case if incrementing works but setting redis expiry doesn't in that case, but that should be very unlikely. If that happens htough, we'll count up without sampling.
relay-sampling/src/evaluation.rs
Outdated
} | ||
|
||
/// Evaluates a reservoir rule, returning `true` if it should be sampled. | ||
pub fn evaluate(&self, rule: RuleId, limit: i64, rule_expiry: Option<&DateTime<Utc>>) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function should satisfy all the performance optimizations mentioned.
- it gets the local count and increments it if limit hasn't been reached.
- fast return if its above the limit.
- if redis isnt configured, we return early
- if redis is configured but the received value is less than local value, we avoid locking again
- only if we receive from redis, and it is higher, do we update the count by locking the mutex again
relay-sampling/src/evaluation.rs
Outdated
} | ||
|
||
/// Gets the local count of a reservoir rule. Increments the count if limit isnt reached. | ||
fn local_count(&self, rule: RuleId, limit: i64) -> Option<i64> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure whats worse, returning an option when it should be a result, or havingLockResult<MutexGuard<'_, BTreeMap<RuleId, i64>>>
in the signature
relay-sampling/src/evaluation.rs
Outdated
if redis_sampling::set_redis_expiry(&mut redis_connection, &key, rule_expiry).is_err() { | ||
relay_log::error!("failed to set redis reservoir rule expiry"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could log and ignore the error on EXPIRE like you do here, and handle the increment error gracefully. The more correct version, however, would be: Encapsulate the two calls into a single function that throws an error, and if the error occurs simply do not match the rule.
That would mean if Redis is unavailable we skip all reservoir rules and apply other matching rules. A graceful and predictable fallback behavior that does not cause excess indexing.
A follow-up to #2550. The main motivation for this PR is that the `SamplingRule::Evaluate` method became pretty ugly because it relied on that the two variants of `SamplingValue` both had a sample rate attached, which it no longer does with the introduction of the `Reservoir` variant. The function has mainly been replaced by `SamplingEvaluator::try_compute_sample_rate`. Whose job it is to validate the rule, and if it's valid, return a sample rate. Validating for the `SamplingRate` and `Factor` variant means that it's not out of bounds with the given time range (depending on the decaying function), and for the `Reservoir` variant it means that the limit has not been exceeded. It will return an optional `ControlFlow`, where `None` means the rule is invalid and should be skipped, `Break` is analogous to `SamplingValue::SampleRate` (but includes reservoir), and `Continue` is analogous to `SamplingValue::Factor`. Some other adjustments have also been made, this should be all the changes: * Created the `try_compute_sample_rate` method as described. * Moved the decaying function logic to its own method. * Moved the checking of time range constraints to the beginning of for loop, so we don't do the more expensive condition-matching first. * Updated tests Co-authored-by: Jan Michael Auer <mail@jauer.org>
Relay implementation of the reservoir project: getsentry/sentry#54449
Reservoir bias uses a type of
SamplingRule
which will sample all matches until a certain limit has been reached. This limit is tracked both locally on each relay, and with a global synchronized one in redis that procesisng relays can have access to. The redis counter will update the local counter if it's available.The counters are saved on the
Project
struct, with a Mutex<BTreeMap<Ruleid, i64>> .When we send an envelope for processing we send its corresponding project counters in the
ProcessEnvelopeState
to theEnvelopeProcessorService
.There, in the
dynamic-sampling
crate, we introduce aReservoirEvaluator
, which will, when a reservoir rule is matching, check if the rule has been reached or not by using the local counters we sent or if applicable the redis global count. TheReservoirEvaluator
also takes care of updating both redis and the local counter.After the limit is reached, the rule is no longer valid and will be ignored, so that the normal
SampleRate
andFactor
variant ofSamplingValue
will apply.Sentry is responsible for removing the reservoir rule from the
SamplingConfig
when it has reached its limit.Whenever we receive a new
ProjectConfig
, we remove all the reservoir counters from its project that are no longer in theDynamicSamplingConfig
.regarding the use of mutex:
We use try_lock to avoid getting blocked in case the mutex is already in use. There's two reasons it might be blocked.