Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spooler): Add capacity check for the envelope buffer #3925

Merged
merged 24 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@

- Add `EnvelopeStore` trait and implement `DiskUsage` for tracking disk usage. ([#3925](https://github.com/getsentry/relay/pull/3925))

**Features**:

- Add configuration option to specify the instance type of Relay. ([#3938](https://github.com/getsentry/relay/pull/3938))
- Update definitions for user agent parsing. ([#3951](https://github.com/getsentry/relay/pull/3951))

## 24.8.0

**Bug Fixes**:
Expand Down
6 changes: 3 additions & 3 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub fn event_id_from_items(items: &Items) -> Result<Option<EventId>, BadStoreReq
///
/// Queueing can fail if the queue exceeds `envelope_buffer_size`. In this case, `Err` is
/// returned and the envelope is not queued.
async fn queue_envelope(
fn queue_envelope(
state: &ServiceState,
mut managed_envelope: ManagedEnvelope,
) -> Result<(), BadStoreRequest> {
Expand Down Expand Up @@ -307,7 +307,7 @@ async fn queue_envelope(

match state.envelope_buffer() {
Some(buffer) => {
if !buffer.has_capacity().await {
if !buffer.has_capacity() {
return Err(BadStoreRequest::QueueFailed);
}

Expand Down Expand Up @@ -388,7 +388,7 @@ pub async fn handle_envelope(
return Err(BadStoreRequest::Overflow(offender));
}

queue_envelope(state, managed_envelope).await?;
queue_envelope(state, managed_envelope)?;

if checked.rate_limits.is_limited() {
// Even if some envelope items have been queued, there might be active rate limits on
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ where
);
}

/// Returns `true` if the underlying storage has the capacity to store more envelopes.
pub fn has_capacity(&self) -> bool {
self.stack_provider.has_store_capacity()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::services::buffer::envelope_stack::EnvelopeStack;
use crate::services::buffer::envelope_store::sqlite::{
SqliteEnvelopeStore, SqliteEnvelopeStoreError,
};
use crate::services::buffer::envelope_store::EnvelopeStore;
use crate::statsd::RelayCounters;

/// An error returned when doing an operation on [`SqliteEnvelopeStack`].
Expand Down
41 changes: 0 additions & 41 deletions relay-server/src/services/buffer/envelope_store/mod.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1 @@
use std::future::Future;

use hashbrown::HashSet;
use relay_base_schema::project::ProjectKey;

use crate::Envelope;

pub mod sqlite;

/// Trait that models a store of [`Envelope`]s.
pub trait EnvelopeStore {
/// The type that is inserted in the store.
type Envelope;

/// The error type that is returned when an error occurs in the store.
type Error;

/// Inserts one or more envelopes into the store.
fn insert_many(
&mut self,
envelopes: impl IntoIterator<Item = Self::Envelope>,
) -> impl Future<Output = Result<(), Self::Error>>;

/// Deletes one or more envelopes that match `own_key` and `sampling_key` up to `limit` from
/// the store.
fn delete_many(
&mut self,
own_key: ProjectKey,
sampling_key: ProjectKey,
limit: i64,
) -> impl Future<Output = Result<Vec<Box<Envelope>>, Self::Error>>;

/// Returns a set of project key pairs, representing all the unique combinations of
/// `own_key` and `project_key` that are found in the store.
#[allow(dead_code)]
fn project_key_pairs(
&self,
) -> impl Future<Output = Result<HashSet<(ProjectKey, ProjectKey)>, Self::Error>>;

/// Returns the usage of the store where the definition of usage depends on the implementation.
fn usage(&self) -> u64;
}
23 changes: 9 additions & 14 deletions relay-server/src/services/buffer/envelope_store/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::time::Duration;

use crate::envelope::EnvelopeError;
use crate::extractors::StartTime;
use crate::services::buffer::envelope_store::EnvelopeStore;
use crate::statsd::RelayGauges;
use crate::Envelope;
use futures::stream::StreamExt;
Expand Down Expand Up @@ -294,18 +293,12 @@ impl SqliteEnvelopeStore {

Ok(())
}
}

impl EnvelopeStore for SqliteEnvelopeStore {
type Envelope = InsertEnvelope;

type Error = SqliteEnvelopeStoreError;

/// Inserts one or more envelopes into the database.
async fn insert_many(
pub async fn insert_many(
&mut self,
envelopes: impl IntoIterator<Item = Self::Envelope>,
) -> Result<(), Self::Error> {
envelopes: impl IntoIterator<Item = InsertEnvelope>,
) -> Result<(), SqliteEnvelopeStoreError> {
if let Err(err) = build_insert_many_envelopes(envelopes.into_iter())
.build()
.execute(&self.db)
Expand All @@ -323,12 +316,12 @@ impl EnvelopeStore for SqliteEnvelopeStore {
}

/// Deletes and returns at most `limit` [`Envelope`]s from the database.
async fn delete_many(
pub async fn delete_many(
&mut self,
own_key: ProjectKey,
sampling_key: ProjectKey,
limit: i64,
) -> Result<Vec<Box<Envelope>>, Self::Error> {
) -> Result<Vec<Box<Envelope>>, SqliteEnvelopeStoreError> {
let envelopes = build_delete_and_fetch_many_envelopes(own_key, sampling_key, limit)
.fetch(&self.db)
.peekable();
Expand Down Expand Up @@ -386,7 +379,9 @@ impl EnvelopeStore for SqliteEnvelopeStore {

/// Returns a set of project key pairs, representing all the unique combinations of
/// `own_key` and `project_key` that are found in the database.
async fn project_key_pairs(&self) -> Result<HashSet<(ProjectKey, ProjectKey)>, Self::Error> {
pub async fn project_key_pairs(
&self,
) -> Result<HashSet<(ProjectKey, ProjectKey)>, SqliteEnvelopeStoreError> {
let project_key_pairs = build_get_project_key_pairs()
.fetch_all(&self.db)
.await
Expand All @@ -402,7 +397,7 @@ impl EnvelopeStore for SqliteEnvelopeStore {
}

/// Returns an approximate measure of the used size of the database.
fn usage(&self) -> u64 {
pub fn usage(&self) -> u64 {
self.disk_usage.usage()
}
}
Expand Down
37 changes: 32 additions & 5 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Types for buffering envelopes.

use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;

use relay_base_schema::project::ProjectKey;
Expand All @@ -10,11 +10,13 @@ use tokio::sync::MutexGuard;
use crate::envelope::Envelope;
use crate::utils::{ManagedEnvelope, MemoryChecker};

use crate::statsd::RelayCounters;
pub use envelope_buffer::EnvelopeBufferError;
pub use envelope_buffer::PolymorphicEnvelopeBuffer;
pub use envelope_stack::sqlite::SqliteEnvelopeStack; // pub for benchmarks
pub use envelope_stack::EnvelopeStack; // pub for benchmarks
pub use envelope_store::sqlite::SqliteEnvelopeStore; // pub for benchmarks // pub for benchmarks
pub use envelope_store::sqlite::SqliteEnvelopeStore;
// pub for benchmarks // pub for benchmarks

mod envelope_buffer;
mod envelope_stack;
Expand Down Expand Up @@ -53,6 +55,8 @@ pub struct GuardedEnvelopeBuffer {
notify: tokio::sync::Notify,
/// Metric that counts how many push operations are waiting.
inflight_push_count: AtomicU64,
/// Last known capacity check result.
cached_capacity: AtomicBool,
}

impl GuardedEnvelopeBuffer {
Expand All @@ -69,6 +73,7 @@ impl GuardedEnvelopeBuffer {
}),
notify: tokio::sync::Notify::new(),
inflight_push_count: AtomicU64::new(0),
cached_capacity: AtomicBool::new(true),
})
} else {
None
Expand Down Expand Up @@ -138,9 +143,31 @@ impl GuardedEnvelopeBuffer {
}

/// Returns `true` if the buffer has capacity to accept more [`Envelope`]s.
pub async fn has_capacity(&self) -> bool {
let guard = self.inner.lock().await;
guard.backend.has_capacity()
///
/// This method tries to acquire the lock and read the latest capacity, but doesn't
/// guarantee that the returned value will be up to date, since lock contention could lead to
/// this method never acquiring the lock, thus returning the last known capacity value.
pub fn has_capacity(&self) -> bool {
match self.inner.try_lock() {
Ok(guard) => {
relay_statsd::metric!(
counter(RelayCounters::BufferCapacityCheck) += 1,
lock_acquired = "true"
);

let has_capacity = guard.backend.has_capacity();
self.cached_capacity.store(has_capacity, Ordering::Relaxed);
has_capacity
}
Err(_) => {
relay_statsd::metric!(
counter(RelayCounters::BufferCapacityCheck) += 1,
lock_acquired = "false"
);

self.cached_capacity.load(Ordering::Relaxed)
}
}
}

/// Returns the count of how many pushes are in flight and not been finished.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use relay_config::Config;
use crate::services::buffer::envelope_store::sqlite::{
SqliteEnvelopeStore, SqliteEnvelopeStoreError,
};
use crate::services::buffer::envelope_store::EnvelopeStore;
use crate::services::buffer::stack_provider::StackProvider;
use crate::{Envelope, SqliteEnvelopeStack};

Expand Down
7 changes: 7 additions & 0 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,12 @@ pub enum RelayCounters {
/// - `state_out`: The new state. `memory`, `memory_file_standby`, or `disk`.
/// - `reason`: Why a transition was made (or not made).
BufferStateTransition,
/// Number of times the capacity is of the buffer is checked.
///
/// This metric is tagged with:
/// - `lock_acquired`: Whether the capacity check was done by acquiring the lock or using the
/// old value.
BufferCapacityCheck,
///
/// Number of outcomes and reasons for rejected Envelopes.
///
Expand Down Expand Up @@ -817,6 +823,7 @@ impl CounterMetric for RelayCounters {
RelayCounters::BufferEnvelopesWritten => "buffer.envelopes_written",
RelayCounters::BufferEnvelopesRead => "buffer.envelopes_read",
RelayCounters::BufferStateTransition => "buffer.state.transition",
RelayCounters::BufferCapacityCheck => "buffer.capacity_check",
RelayCounters::Outcomes => "events.outcomes",
RelayCounters::ProjectStateGet => "project_state.get",
RelayCounters::ProjectStateRequest => "project_state.request",
Expand Down
Loading