Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spooler): Add capacity check for the envelope buffer #3925

Merged
merged 24 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

- Keep frames from both ends of the stacktrace when trimming frames. ([#3905](https://github.com/getsentry/relay/pull/3905))

**Internal**:

- Add `EnvelopeStore` trait and implement `DiskUsage` for tracking disk usage. ([#3925](https://github.com/getsentry/relay/pull/3925))

## 24.8.0

**Bug Fixes**:
Expand Down
17 changes: 16 additions & 1 deletion relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,11 @@ fn spool_envelopes_max_envelope_delay_secs() -> u64 {
24 * 60 * 60
}

/// Default refresh frequency in ms for the disk usage monitoring.
fn spool_disk_usage_refresh_frequency_ms() -> u64 {
100
}

/// Persistent buffering configuration for incoming envelopes.
#[derive(Debug, Serialize, Deserialize)]
pub struct EnvelopeSpool {
Expand All @@ -879,7 +884,7 @@ pub struct EnvelopeSpool {
min_connections: u32,
/// The maximum size of the buffer to keep, in bytes.
///
/// If not set the befault is 524288000 bytes (500MB).
/// If not set the default is 524288000 bytes (500MB).
#[serde(default = "spool_envelopes_max_disk_size")]
max_disk_size: ByteSize,
/// The maximum bytes to keep in the memory buffer before spooling envelopes to disk, in bytes.
Expand All @@ -903,6 +908,10 @@ pub struct EnvelopeSpool {
/// they are dropped. Defaults to 24h.
#[serde(default = "spool_envelopes_max_envelope_delay_secs")]
max_envelope_delay_secs: u64,
/// The refresh frequency in ms of how frequently disk usage is updated by querying SQLite
/// internal page stats.
#[serde(default = "spool_disk_usage_refresh_frequency_ms")]
disk_usage_refresh_frequency_ms: u64,
/// Version of the spooler.
#[serde(default)]
version: EnvelopeSpoolVersion,
Expand Down Expand Up @@ -938,6 +947,7 @@ impl Default for EnvelopeSpool {
disk_batch_size: spool_envelopes_stack_disk_batch_size(),
max_batches: spool_envelopes_stack_max_batches(),
max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
version: EnvelopeSpoolVersion::default(),
}
}
Expand Down Expand Up @@ -2162,6 +2172,11 @@ impl Config {
Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
}

/// Returns the refresh frequency for disk usage monitoring as a [`Duration`] object.
pub fn spool_disk_usage_refresh_frequency_ms(&self) -> Duration {
Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms)
}

/// Returns the maximum size of an event payload in bytes.
pub fn max_event_size(&self) -> usize {
self.values.limits.max_event_size.as_bytes()
Expand Down
23 changes: 19 additions & 4 deletions relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ use relay_config::Config;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
use sqlx::{Pool, Sqlite};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tempfile::TempDir;
use tokio::runtime::Runtime;

use relay_base_schema::project::ProjectKey;
use relay_server::{
Envelope, EnvelopeStack, PolymorphicEnvelopeBuffer, SqliteEnvelopeStack, SqliteEnvelopeStore,
Envelope, EnvelopeStack, MemoryChecker, MemoryStat, PolymorphicEnvelopeBuffer,
SqliteEnvelopeStack, SqliteEnvelopeStore,
};

fn setup_db(path: &PathBuf) -> Pool<Sqlite> {
Expand Down Expand Up @@ -72,7 +74,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("test.db");
let db = setup_db(&db_path);
let envelope_store = SqliteEnvelopeStore::new(db.clone());
let envelope_store = SqliteEnvelopeStore::new(db.clone(), Duration::from_millis(100));

let runtime = Runtime::new().unwrap();

Expand Down Expand Up @@ -221,6 +223,17 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
let num_projects = 100000;
let envelopes_per_project = 10;

let config: Arc<Config> = Config::from_json_value(serde_json::json!({
"spool": {
"health": {
"max_memory_percent": 1.0
}
}
}))
.unwrap()
.into();
let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone());

group.throughput(Throughput::Elements(
num_projects * envelopes_per_project as u64,
));
Expand All @@ -245,7 +258,8 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
},
|envelopes| {
runtime.block_on(async {
let mut buffer = PolymorphicEnvelopeBuffer::from_config(&Config::default());
let mut buffer =
PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone());
for envelope in envelopes.into_iter() {
buffer.push(envelope).await.unwrap();
}
Expand Down Expand Up @@ -274,7 +288,8 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
},
|envelopes| {
runtime.block_on(async {
let mut buffer = PolymorphicEnvelopeBuffer::from_config(&Config::default());
let mut buffer =
PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone());
let n = envelopes.len();
for envelope in envelopes.into_iter() {
let public_key = envelope.meta().public_key();
Expand Down
17 changes: 6 additions & 11 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub fn event_id_from_items(items: &Items) -> Result<Option<EventId>, BadStoreReq
///
/// Queueing can fail if the queue exceeds `envelope_buffer_size`. In this case, `Err` is
/// returned and the envelope is not queued.
fn queue_envelope(
async fn queue_envelope(
state: &ServiceState,
mut managed_envelope: ManagedEnvelope,
) -> Result<(), BadStoreRequest> {
Expand Down Expand Up @@ -307,12 +307,14 @@ fn queue_envelope(

match state.envelope_buffer() {
Some(buffer) => {
if !buffer.has_capacity().await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second review, I think we should make this method sync, because we should avoid blocking the request handler on disk read / writes under all circumstances. One option to make it sync would be to change GuardedEnvelopeBuffer::has_capacity to a try_lock, and cache the resulting bool:

        match self.inner.try_lock() {
            Ok(guard) => {
                let c = guard.backend.has_capacity();
                self.cached_capacity.store(c);
                c
            },
            Err(_) => self.cached_capacity.load()
        }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this could be an idea, given the intrinsic delay of memory and disk usage calculation, adding one by caching it should not be a big deal. My main worry with this is that we can't guarantee updated values in any way if lock contention is arbitrarily high.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds acceptable to me. We can emit a counter metric to track how often we emit cached vs fresh capacity. Once we transform this to a service, the cached capacity will be updated differently, anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I added the metric because I am interested.

return Err(BadStoreRequest::QueueFailed);
}

// NOTE: This assumes that a `prefetch` has already been scheduled for both the
// envelope's projects. See `handle_check_envelope`.
relay_log::trace!("Pushing envelope to V2 buffer");

// TODO: Sync-check whether the buffer has capacity.
// Otherwise return `QueueFailed`.
buffer.defer_push(envelope);
}
None => {
Expand Down Expand Up @@ -347,13 +349,6 @@ pub async fn handle_envelope(
)
}

// TODO(jjbayer): Move this check to spool impl
if state.memory_checker().check_memory().is_exceeded() {
// NOTE: Long-term, we should not reject the envelope here, but spool it to disk instead.
// This will be fixed with the new spool implementation.
return Err(BadStoreRequest::QueueFailed);
};

let mut managed_envelope = ManagedEnvelope::new(
envelope,
state.outcome_aggregator().clone(),
Expand Down Expand Up @@ -393,7 +388,7 @@ pub async fn handle_envelope(
return Err(BadStoreRequest::Overflow(offender));
}

queue_envelope(state, managed_envelope)?;
queue_envelope(state, managed_envelope).await?;

if checked.rate_limits.is_limited() {
// Even if some envelope items have been queued, there might be active rate limits on
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ pub use self::services::buffer::{
EnvelopeStack, PolymorphicEnvelopeBuffer, SqliteEnvelopeStack, SqliteEnvelopeStore,
}; // pub for benchmarks
pub use self::services::spooler::spool_utils;
pub use self::utils::{MemoryChecker, MemoryStat}; // pub for benchmarks

#[cfg(test)]
mod testutils;
Expand Down
6 changes: 5 additions & 1 deletion relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,11 @@ impl ServiceState {
upstream_relay.clone(),
global_config.clone(),
);
let envelope_buffer = GuardedEnvelopeBuffer::from_config(&config).map(Arc::new);
let envelope_buffer = GuardedEnvelopeBuffer::from_config(
&config,
MemoryChecker::new(memory_stat.clone(), config.clone()),
)
.map(Arc::new);
ProjectCacheService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
Expand Down
Loading
Loading