diff --git a/CHANGELOG.md b/CHANGELOG.md index 64ca4675488..7283e9e1546 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,22 +2,20 @@ ## Unreleased -**Features:** - -- Extend project config API to be revision aware. ([#3947](https://github.com/getsentry/relay/pull/3947)). - **Bug Fixes**: - Keep frames from both ends of the stacktrace when trimming frames. ([#3905](https://github.com/getsentry/relay/pull/3905)) -**Internal**: - -- Record too long discard reason for session replays. ([#3950](https://github.com/getsentry/relay/pull/3950)) - **Features**: - Add configuration option to specify the instance type of Relay. ([#3938](https://github.com/getsentry/relay/pull/3938)) - Update definitions for user agent parsing. ([#3951](https://github.com/getsentry/relay/pull/3951)) +- Extend project config API to be revision aware. ([#3947](https://github.com/getsentry/relay/pull/3947)) + +**Internal**: + +- Record too long discard reason for session replays. ([#3950](https://github.com/getsentry/relay/pull/3950)) +- Add `EnvelopeStore` trait and implement `DiskUsage` for tracking disk usage. ([#3925](https://github.com/getsentry/relay/pull/3925)) ## 24.8.0 diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 5ad262d7f5d..b3b5d36bf67 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -907,6 +907,11 @@ fn spool_envelopes_max_envelope_delay_secs() -> u64 { 24 * 60 * 60 } +/// Default refresh frequency in ms for the disk usage monitoring. +fn spool_disk_usage_refresh_frequency_ms() -> u64 { + 100 +} + /// Persistent buffering configuration for incoming envelopes. #[derive(Debug, Serialize, Deserialize)] pub struct EnvelopeSpool { @@ -922,7 +927,7 @@ pub struct EnvelopeSpool { min_connections: u32, /// The maximum size of the buffer to keep, in bytes. /// - /// If not set the befault is 524288000 bytes (500MB). + /// If not set the default is 524288000 bytes (500MB). #[serde(default = "spool_envelopes_max_disk_size")] max_disk_size: ByteSize, /// The maximum bytes to keep in the memory buffer before spooling envelopes to disk, in bytes. @@ -946,6 +951,10 @@ pub struct EnvelopeSpool { /// they are dropped. Defaults to 24h. #[serde(default = "spool_envelopes_max_envelope_delay_secs")] max_envelope_delay_secs: u64, + /// The refresh frequency in ms of how frequently disk usage is updated by querying SQLite + /// internal page stats. + #[serde(default = "spool_disk_usage_refresh_frequency_ms")] + disk_usage_refresh_frequency_ms: u64, /// Version of the spooler. #[serde(default)] version: EnvelopeSpoolVersion, @@ -981,6 +990,7 @@ impl Default for EnvelopeSpool { disk_batch_size: spool_envelopes_stack_disk_batch_size(), max_batches: spool_envelopes_stack_max_batches(), max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(), + disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(), version: EnvelopeSpoolVersion::default(), } } @@ -2216,6 +2226,11 @@ impl Config { Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs) } + /// Returns the refresh frequency for disk usage monitoring as a [`Duration`] object. + pub fn spool_disk_usage_refresh_frequency_ms(&self) -> Duration { + Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms) + } + /// Returns the maximum size of an event payload in bytes. pub fn max_event_size(&self) -> usize { self.values.limits.max_event_size.as_bytes() diff --git a/relay-server/benches/benches.rs b/relay-server/benches/benches.rs index ac6ff482572..b79dafcb489 100644 --- a/relay-server/benches/benches.rs +++ b/relay-server/benches/benches.rs @@ -4,13 +4,15 @@ use relay_config::Config; use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}; use sqlx::{Pool, Sqlite}; use std::path::PathBuf; +use std::sync::Arc; use std::time::{Duration, Instant}; use tempfile::TempDir; use tokio::runtime::Runtime; use relay_base_schema::project::ProjectKey; use relay_server::{ - Envelope, EnvelopeStack, PolymorphicEnvelopeBuffer, SqliteEnvelopeStack, SqliteEnvelopeStore, + Envelope, EnvelopeStack, MemoryChecker, MemoryStat, PolymorphicEnvelopeBuffer, + SqliteEnvelopeStack, SqliteEnvelopeStore, }; fn setup_db(path: &PathBuf) -> Pool { @@ -72,7 +74,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { let temp_dir = TempDir::new().unwrap(); let db_path = temp_dir.path().join("test.db"); let db = setup_db(&db_path); - let envelope_store = SqliteEnvelopeStore::new(db.clone()); + let envelope_store = SqliteEnvelopeStore::new(db.clone(), Duration::from_millis(100)); let runtime = Runtime::new().unwrap(); @@ -221,6 +223,17 @@ fn benchmark_envelope_buffer(c: &mut Criterion) { let num_projects = 100000; let envelopes_per_project = 10; + let config: Arc = Config::from_json_value(serde_json::json!({ + "spool": { + "health": { + "max_memory_percent": 1.0 + } + } + })) + .unwrap() + .into(); + let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); + group.throughput(Throughput::Elements( num_projects * envelopes_per_project as u64, )); @@ -245,7 +258,8 @@ fn benchmark_envelope_buffer(c: &mut Criterion) { }, |envelopes| { runtime.block_on(async { - let mut buffer = PolymorphicEnvelopeBuffer::from_config(&Config::default()); + let mut buffer = + PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone()); for envelope in envelopes.into_iter() { buffer.push(envelope).await.unwrap(); } @@ -274,7 +288,8 @@ fn benchmark_envelope_buffer(c: &mut Criterion) { }, |envelopes| { runtime.block_on(async { - let mut buffer = PolymorphicEnvelopeBuffer::from_config(&Config::default()); + let mut buffer = + PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone()); let n = envelopes.len(); for envelope in envelopes.into_iter() { let public_key = envelope.meta().public_key(); diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 20c8ccf211c..22196079eb5 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -307,12 +307,14 @@ fn queue_envelope( match state.envelope_buffer() { Some(buffer) => { + if !buffer.has_capacity() { + return Err(BadStoreRequest::QueueFailed); + } + // NOTE: This assumes that a `prefetch` has already been scheduled for both the // envelope's projects. See `handle_check_envelope`. relay_log::trace!("Pushing envelope to V2 buffer"); - // TODO: Sync-check whether the buffer has capacity. - // Otherwise return `QueueFailed`. buffer.defer_push(envelope); } None => { @@ -347,7 +349,7 @@ pub async fn handle_envelope( ) } - // TODO(jjbayer): Move this check to spool impl + // TODO(jjbayer): Remove this check once spool v1 is removed. if state.memory_checker().check_memory().is_exceeded() { // NOTE: Long-term, we should not reject the envelope here, but spool it to disk instead. // This will be fixed with the new spool implementation. diff --git a/relay-server/src/lib.rs b/relay-server/src/lib.rs index 66ba5bc6d66..986db2e5d4d 100644 --- a/relay-server/src/lib.rs +++ b/relay-server/src/lib.rs @@ -271,6 +271,7 @@ pub use self::services::buffer::{ EnvelopeStack, PolymorphicEnvelopeBuffer, SqliteEnvelopeStack, SqliteEnvelopeStore, }; // pub for benchmarks pub use self::services::spooler::spool_utils; +pub use self::utils::{MemoryChecker, MemoryStat}; // pub for benchmarks #[cfg(test)] mod testutils; diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 0770449bd39..26561c2d7dd 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -254,7 +254,11 @@ impl ServiceState { upstream_relay.clone(), global_config.clone(), ); - let envelope_buffer = GuardedEnvelopeBuffer::from_config(&config).map(Arc::new); + let envelope_buffer = GuardedEnvelopeBuffer::from_config( + &config, + MemoryChecker::new(memory_stat.clone(), config.clone()), + ) + .map(Arc::new); ProjectCacheService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 99e50a09bac..d34536d5add 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -8,11 +8,13 @@ use relay_config::Config; use crate::envelope::Envelope; use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError; -use crate::services::buffer::envelope_stack::{EnvelopeStack, StackProvider}; -use crate::services::buffer::sqlite_envelope_store::SqliteEnvelopeStoreError; +use crate::services::buffer::envelope_stack::EnvelopeStack; +use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError; use crate::services::buffer::stack_provider::memory::MemoryStackProvider; use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider; +use crate::services::buffer::stack_provider::StackProvider; use crate::statsd::{RelayCounters, RelayGauges}; +use crate::utils::MemoryChecker; /// Polymorphic envelope buffering interface. /// @@ -34,11 +36,12 @@ pub enum PolymorphicEnvelopeBuffer { impl PolymorphicEnvelopeBuffer { /// Creates either a memory-based or a disk-based envelope buffer, /// depending on the given configuration. - pub fn from_config(config: &Config) -> Self { + pub fn from_config(config: &Config, memory_checker: MemoryChecker) -> Self { if config.spool_envelopes_path().is_some() { panic!("Disk backend not yet supported for spool V2"); } - Self::InMemory(EnvelopeBuffer::::new()) + + Self::InMemory(EnvelopeBuffer::::new(memory_checker)) } /// Adds an envelope to the buffer. @@ -71,13 +74,21 @@ impl PolymorphicEnvelopeBuffer { /// Marks a project as ready or not ready. /// - /// The buffer reprioritizes its envelopes based on this information. + /// The buffer re-prioritizes its envelopes based on this information. pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { match self { Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready), Self::InMemory(buffer) => buffer.mark_ready(project, is_ready), } } + + /// Returns `true` whether the buffer has capacity to accept new [`Envelope`]s. + pub fn has_capacity(&self) -> bool { + match self { + Self::Sqlite(buffer) => buffer.has_capacity(), + Self::InMemory(buffer) => buffer.has_capacity(), + } + } } /// Error that occurs while interacting with the envelope buffer. @@ -86,6 +97,9 @@ pub enum EnvelopeBufferError { #[error("sqlite")] Sqlite(#[from] SqliteEnvelopeStackError), + #[error("failed to push envelope to the buffer")] + PushFailed, + #[error("impossible")] Impossible(#[from] Infallible), } @@ -100,7 +114,7 @@ struct EnvelopeBuffer { priority_queue: priority_queue::PriorityQueue, Priority>, /// A lookup table to find all stacks involving a project. stacks_by_project: hashbrown::HashMap>, - /// A helper to create new stacks. + /// A provider of stacks that provides utilities to create stacks, check their capacity... /// /// This indirection is needed because different stack implementations might need different /// initialization (e.g. a database connection). @@ -108,19 +122,19 @@ struct EnvelopeBuffer { } impl EnvelopeBuffer { - /// Creates an empty buffer. - pub fn new() -> Self { + /// Creates an empty memory-based buffer. + pub fn new(memory_checker: MemoryChecker) -> Self { Self { stacks_by_project: Default::default(), priority_queue: Default::default(), - stack_provider: MemoryStackProvider, + stack_provider: MemoryStackProvider::new(memory_checker), } } } #[allow(dead_code)] impl EnvelopeBuffer { - /// Creates an empty buffer. + /// Creates an empty sqlite-based buffer. pub async fn new(config: &Config) -> Result { Ok(Self { stacks_by_project: Default::default(), @@ -132,7 +146,7 @@ impl EnvelopeBuffer { impl EnvelopeBuffer

where - EnvelopeBufferError: std::convert::From<::Error>, + EnvelopeBufferError: From<::Error>, { /// Pushes an envelope to the appropriate envelope stack and reprioritizes the stack. /// @@ -221,7 +235,7 @@ where Ok(Some(envelope)) } - /// Reprioritizes all stacks that involve the given project key by setting it to "ready". + /// Re-prioritizes all stacks that involve the given project key by setting it to "ready". pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { let mut changed = false; if let Some(stack_keys) = self.stacks_by_project.get(project) { @@ -272,6 +286,11 @@ where ); } + /// Returns `true` if the underlying storage has the capacity to store more envelopes. + pub fn has_capacity(&self) -> bool { + self.stack_provider.has_store_capacity() + } + fn pop_stack(&mut self, stack_key: StackKey) { for project_key in stack_key.iter() { self.stacks_by_project @@ -420,14 +439,16 @@ impl Readiness { #[cfg(test)] mod tests { use std::str::FromStr; - use uuid::Uuid; + use std::sync::Arc; use relay_common::Dsn; use relay_event_schema::protocol::EventId; use relay_sampling::DynamicSamplingContext; + use uuid::Uuid; use crate::envelope::{Item, ItemType}; use crate::extractors::RequestMeta; + use crate::utils::MemoryStat; use super::*; @@ -461,9 +482,23 @@ mod tests { envelope } + fn mock_memory_checker() -> MemoryChecker { + let config: Arc<_> = Config::from_json_value(serde_json::json!({ + "spool": { + "health": { + "max_memory_percent": 1.0 + } + } + })) + .unwrap() + .into(); + + MemoryChecker::new(MemoryStat::default(), config.clone()) + } + #[tokio::test] async fn test_insert_pop() { - let mut buffer = EnvelopeBuffer::::new(); + let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); @@ -547,7 +582,7 @@ mod tests { #[tokio::test] async fn test_project_internal_order() { - let mut buffer = EnvelopeBuffer::::new(); + let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); @@ -574,7 +609,7 @@ mod tests { #[tokio::test] async fn test_sampling_projects() { - let mut buffer = EnvelopeBuffer::::new(); + let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap(); @@ -652,7 +687,7 @@ mod tests { assert_ne!(stack_key1, stack_key2); - let mut buffer = EnvelopeBuffer::::new(); + let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); buffer .push(new_envelope(project_key1, Some(project_key2), None)) .await @@ -666,7 +701,7 @@ mod tests { #[tokio::test] async fn test_last_peek_internal_order() { - let mut buffer = EnvelopeBuffer::::new(); + let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let event_id_1 = EventId::new(); diff --git a/relay-server/src/services/buffer/envelope_stack/mod.rs b/relay-server/src/services/buffer/envelope_stack/mod.rs index ee48016f099..8e48f7391fe 100644 --- a/relay-server/src/services/buffer/envelope_stack/mod.rs +++ b/relay-server/src/services/buffer/envelope_stack/mod.rs @@ -20,9 +20,3 @@ pub trait EnvelopeStack: Send + std::fmt::Debug { /// Pops the [`Envelope`] on top of the stack. fn pop(&mut self) -> impl Future>, Self::Error>>; } - -pub trait StackProvider: std::fmt::Debug { - type Stack: EnvelopeStack; - - fn create_stack(&self, envelope: Box) -> Self::Stack; -} diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 9ea8d16a66e..2117ae3d538 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -6,7 +6,7 @@ use relay_base_schema::project::ProjectKey; use crate::envelope::Envelope; use crate::services::buffer::envelope_stack::EnvelopeStack; -use crate::services::buffer::sqlite_envelope_store::{ +use crate::services::buffer::envelope_store::sqlite::{ SqliteEnvelopeStore, SqliteEnvelopeStoreError, }; use crate::statsd::RelayCounters; @@ -289,7 +289,7 @@ mod tests { #[should_panic] async fn test_push_with_mismatching_project_keys() { let db = setup_db(false).await; - let envelope_store = SqliteEnvelopeStore::new(db); + let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); let mut stack = SqliteEnvelopeStack::new( envelope_store, 2, @@ -305,7 +305,7 @@ mod tests { #[tokio::test] async fn test_push_when_db_is_not_valid() { let db = setup_db(false).await; - let envelope_store = SqliteEnvelopeStore::new(db); + let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); let mut stack = SqliteEnvelopeStack::new( envelope_store, 2, @@ -357,7 +357,7 @@ mod tests { #[tokio::test] async fn test_pop_when_db_is_not_valid() { let db = setup_db(false).await; - let envelope_store = SqliteEnvelopeStore::new(db); + let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); let mut stack = SqliteEnvelopeStack::new( envelope_store, 2, @@ -376,7 +376,7 @@ mod tests { #[tokio::test] async fn test_pop_when_stack_is_empty() { let db = setup_db(true).await; - let envelope_store = SqliteEnvelopeStore::new(db); + let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); let mut stack = SqliteEnvelopeStack::new( envelope_store, 2, @@ -393,7 +393,7 @@ mod tests { #[tokio::test] async fn test_push_below_threshold_and_pop() { let db = setup_db(true).await; - let envelope_store = SqliteEnvelopeStore::new(db); + let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); let mut stack = SqliteEnvelopeStack::new( envelope_store, 5, @@ -430,7 +430,7 @@ mod tests { #[tokio::test] async fn test_push_above_threshold_and_pop() { let db = setup_db(true).await; - let envelope_store = SqliteEnvelopeStore::new(db); + let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); let mut stack = SqliteEnvelopeStack::new( envelope_store, 5, diff --git a/relay-server/src/services/buffer/envelope_store/mod.rs b/relay-server/src/services/buffer/envelope_store/mod.rs new file mode 100644 index 00000000000..6b1c1083e34 --- /dev/null +++ b/relay-server/src/services/buffer/envelope_store/mod.rs @@ -0,0 +1 @@ +pub mod sqlite; diff --git a/relay-server/src/services/buffer/sqlite_envelope_store.rs b/relay-server/src/services/buffer/envelope_store/sqlite.rs similarity index 78% rename from relay-server/src/services/buffer/sqlite_envelope_store.rs rename to relay-server/src/services/buffer/envelope_store/sqlite.rs index 83616bf55af..3969d540753 100644 --- a/relay-server/src/services/buffer/sqlite_envelope_store.rs +++ b/relay-server/src/services/buffer/envelope_store/sqlite.rs @@ -1,11 +1,19 @@ use std::error::Error; use std::path::Path; use std::pin::pin; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use crate::envelope::EnvelopeError; +use crate::extractors::StartTime; +use crate::statsd::RelayGauges; +use crate::Envelope; use futures::stream::StreamExt; use hashbrown::HashSet; use relay_base_schema::project::{ParseProjectKeyError, ProjectKey}; use relay_config::Config; +use relay_statsd::metric; use sqlx::migrate::MigrateError; use sqlx::query::Query; use sqlx::sqlite::{ @@ -14,10 +22,7 @@ use sqlx::sqlite::{ }; use sqlx::{Pool, QueryBuilder, Row, Sqlite}; use tokio::fs::DirBuilder; - -use crate::envelope::EnvelopeError; -use crate::extractors::StartTime; -use crate::Envelope; +use tokio::time::sleep; /// Struct that contains all the fields of an [`Envelope`] that are mapped to the database columns. pub struct InsertEnvelope { @@ -88,6 +93,93 @@ pub enum SqliteEnvelopeStoreError { FileSizeReadFailed(sqlx::Error), } +#[derive(Debug, Clone)] +struct DiskUsage { + db: Pool, + last_known_usage: Arc, + refresh_frequency: Duration, +} + +impl DiskUsage { + /// Creates a new empty [`DiskUsage`]. + fn new(db: Pool, refresh_frequency: Duration) -> Self { + Self { + db, + last_known_usage: Arc::new(AtomicU64::new(0)), + refresh_frequency, + } + } + + /// Prepares a [`DiskUsage`] instance with an initial reading of the database usage and fails + /// if not reading can be made. + pub async fn prepare( + db: Pool, + refresh_frequency: Duration, + ) -> Result { + let usage = Self::estimate_usage(&db).await?; + + let disk_usage = Self::new(db, refresh_frequency); + disk_usage.last_known_usage.store(usage, Ordering::Relaxed); + disk_usage.start_background_refresh(); + + Ok(disk_usage) + } + + /// Returns the disk usage and asynchronously updates it in case a `refresh_frequency_ms` + /// elapsed. + fn usage(&self) -> u64 { + self.last_known_usage.load(Ordering::Relaxed) + } + + /// Starts a background tokio task to update the database usage. + fn start_background_refresh(&self) { + let db = self.db.clone(); + // We get a weak reference, to make sure that if `DiskUsage` is dropped, the reference can't + // be upgraded, causing the loop in the tokio task to exit. + let last_known_usage_weak = Arc::downgrade(&self.last_known_usage); + let refresh_frequency = self.refresh_frequency; + + tokio::spawn(async move { + loop { + // When our `Weak` reference can't be upgraded to an `Arc`, it means that the value + // is not referenced anymore by self, meaning that `DiskUsage` was dropped. + let Some(last_known_usage) = last_known_usage_weak.upgrade() else { + break; + }; + + let usage = Self::estimate_usage(&db).await; + let Ok(usage) = usage else { + relay_log::error!("failed to update the disk usage asynchronously"); + return; + }; + + let current = last_known_usage.load(Ordering::Relaxed); + if last_known_usage + .compare_exchange_weak(current, usage, Ordering::Relaxed, Ordering::Relaxed) + .is_err() + { + relay_log::error!("failed to update the disk usage asynchronously"); + }; + + sleep(refresh_frequency).await; + } + }); + } + + /// Estimates the disk usage of the SQLite database. + async fn estimate_usage(db: &Pool) -> Result { + let usage: i64 = build_estimate_size() + .fetch_one(db) + .await + .and_then(|r| r.try_get(0)) + .map_err(SqliteEnvelopeStoreError::FileSizeReadFailed)?; + + metric!(gauge(RelayGauges::BufferDiskUsed) = usage as u64); + + Ok(usage as u64) + } +} + /// Struct that offers access to a SQLite-based store of [`Envelope`]s. /// /// The goal of this struct is to hide away all the complexity of dealing with the database for @@ -95,12 +187,16 @@ pub enum SqliteEnvelopeStoreError { #[derive(Debug, Clone)] pub struct SqliteEnvelopeStore { db: Pool, + disk_usage: DiskUsage, } impl SqliteEnvelopeStore { /// Initializes the [`SqliteEnvelopeStore`] with a supplied [`Pool`]. - pub fn new(db: Pool) -> Self { - Self { db } + pub fn new(db: Pool, refresh_frequency: Duration) -> Self { + Self { + db: db.clone(), + disk_usage: DiskUsage::new(db, refresh_frequency), + } } /// Prepares the [`SqliteEnvelopeStore`] by running all the necessary migrations and preparing @@ -132,7 +228,7 @@ impl SqliteEnvelopeStore { .synchronous(SqliteSynchronous::Normal) // The freelist pages are moved to the end of the database file and the database file is truncated to remove the freelist pages at every // transaction commit. Note, however, that auto-vacuum only truncates the freelist pages from the file. - // Auto-vacuum does not defragment the database nor repack individual database pages the way that the VACUUM command does. + // Auto-vacuum does not de-fragment the database nor repack individual database pages the way that the VACUUM command does. // // This will help us to keep the file size under some control. .auto_vacuum(SqliteAutoVacuum::Full) @@ -148,7 +244,11 @@ impl SqliteEnvelopeStore { .await .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?; - Ok(SqliteEnvelopeStore { db }) + Ok(SqliteEnvelopeStore { + db: db.clone(), + disk_usage: DiskUsage::prepare(db, config.spool_disk_usage_refresh_frequency_ms()) + .await?, + }) } /// Set up the database and return the current number of envelopes. @@ -196,7 +296,7 @@ impl SqliteEnvelopeStore { /// Inserts one or more envelopes into the database. pub async fn insert_many( - &self, + &mut self, envelopes: impl IntoIterator, ) -> Result<(), SqliteEnvelopeStoreError> { if let Err(err) = build_insert_many_envelopes(envelopes.into_iter()) @@ -217,7 +317,7 @@ impl SqliteEnvelopeStore { /// Deletes and returns at most `limit` [`Envelope`]s from the database. pub async fn delete_many( - &self, + &mut self, own_key: ProjectKey, sampling_key: ProjectKey, limit: i64, @@ -296,13 +396,9 @@ impl SqliteEnvelopeStore { Ok(project_key_pairs) } - /// Returns an approximate measure of the size of the database. - pub async fn used_size(&self) -> Result { - build_estimate_size() - .fetch_one(&self.db) - .await - .and_then(|r| r.try_get(0)) - .map_err(SqliteEnvelopeStoreError::FileSizeReadFailed) + /// Returns an approximate measure of the used size of the database. + pub fn usage(&self) -> u64 { + self.disk_usage.usage() } } @@ -409,6 +505,7 @@ mod tests { use hashbrown::HashSet; use std::collections::BTreeMap; use std::time::{Duration, Instant}; + use tokio::time::sleep; use uuid::Uuid; use relay_base_schema::project::ProjectKey; @@ -464,7 +561,7 @@ mod tests { #[tokio::test] async fn test_insert_and_delete_envelopes() { let db = setup_db(true).await; - let envelope_store = SqliteEnvelopeStore::new(db); + let mut envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(); @@ -492,7 +589,7 @@ mod tests { #[tokio::test] async fn test_insert_and_get_project_keys_pairs() { let db = setup_db(true).await; - let envelope_store = SqliteEnvelopeStore::new(db); + let mut envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(); @@ -513,4 +610,31 @@ mod tests { (own_key, sampling_key) ); } + + #[tokio::test] + async fn test_estimate_disk_usage() { + let db = setup_db(true).await; + let mut store = SqliteEnvelopeStore::new(db.clone(), Duration::from_millis(1)); + let disk_usage = DiskUsage::prepare(db, Duration::from_millis(1)) + .await + .unwrap(); + + // We read the disk usage without envelopes stored. + let usage_1 = disk_usage.usage(); + assert!(usage_1 > 0); + + // We write 10 envelopes to increase the disk usage. + let envelopes = mock_envelopes(10); + store + .insert_many(envelopes.iter().map(|e| e.as_ref().try_into().unwrap())) + .await + .unwrap(); + + // We wait for the refresh timeout of the disk usage task. + sleep(Duration::from_millis(2)).await; + + // We now expect to read more disk usage because of the 10 elements. + let usage_2 = disk_usage.usage(); + assert!(usage_2 >= usage_1); + } } diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 0a2e40940d3..66c5bff07e9 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -1,6 +1,6 @@ //! Types for buffering envelopes. -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; use relay_base_schema::project::ProjectKey; @@ -8,17 +8,18 @@ use relay_config::Config; use tokio::sync::MutexGuard; use crate::envelope::Envelope; -use crate::utils::ManagedEnvelope; +use crate::utils::{ManagedEnvelope, MemoryChecker}; +use crate::statsd::RelayCounters; pub use envelope_buffer::EnvelopeBufferError; pub use envelope_buffer::PolymorphicEnvelopeBuffer; pub use envelope_stack::sqlite::SqliteEnvelopeStack; // pub for benchmarks pub use envelope_stack::EnvelopeStack; // pub for benchmarks -pub use sqlite_envelope_store::SqliteEnvelopeStore; // pub for benchmarks // pub for benchmarks +pub use envelope_store::sqlite::SqliteEnvelopeStore; // pub for benchmarks mod envelope_buffer; mod envelope_stack; -mod sqlite_envelope_store; +mod envelope_store; mod stack_provider; mod testutils; @@ -53,6 +54,8 @@ pub struct GuardedEnvelopeBuffer { notify: tokio::sync::Notify, /// Metric that counts how many push operations are waiting. inflight_push_count: AtomicU64, + /// Last known capacity check result. + cached_capacity: AtomicBool, } impl GuardedEnvelopeBuffer { @@ -60,15 +63,16 @@ impl GuardedEnvelopeBuffer { /// /// NOTE: until the V1 spooler implementation is removed, this function returns `None` /// if V2 spooling is not configured. - pub fn from_config(config: &Config) -> Option { + pub fn from_config(config: &Config, memory_checker: MemoryChecker) -> Option { if config.spool_v2() { Some(Self { inner: tokio::sync::Mutex::new(Inner { - backend: PolymorphicEnvelopeBuffer::from_config(config), + backend: PolymorphicEnvelopeBuffer::from_config(config, memory_checker), should_peek: true, }), notify: tokio::sync::Notify::new(), inflight_push_count: AtomicU64::new(0), + cached_capacity: AtomicBool::new(true), }) } else { None @@ -137,6 +141,34 @@ impl GuardedEnvelopeBuffer { } } + /// Returns `true` if the buffer has capacity to accept more [`Envelope`]s. + /// + /// This method tries to acquire the lock and read the latest capacity, but doesn't + /// guarantee that the returned value will be up to date, since lock contention could lead to + /// this method never acquiring the lock, thus returning the last known capacity value. + pub fn has_capacity(&self) -> bool { + match self.inner.try_lock() { + Ok(guard) => { + relay_statsd::metric!( + counter(RelayCounters::BufferCapacityCheck) += 1, + lock_acquired = "true" + ); + + let has_capacity = guard.backend.has_capacity(); + self.cached_capacity.store(has_capacity, Ordering::Relaxed); + has_capacity + } + Err(_) => { + relay_statsd::metric!( + counter(RelayCounters::BufferCapacityCheck) += 1, + lock_acquired = "false" + ); + + self.cached_capacity.load(Ordering::Relaxed) + } + } + } + /// Returns the count of how many pushes are in flight and not been finished. pub fn inflight_push_count(&self) -> u64 { self.inflight_push_count.load(Ordering::Relaxed) @@ -218,22 +250,28 @@ mod tests { use relay_common::Dsn; use crate::extractors::RequestMeta; + use crate::utils::MemoryStat; use super::*; fn new_buffer() -> Arc { - GuardedEnvelopeBuffer::from_config( - &Config::from_json_value(serde_json::json!({ + let config = Arc::new( + Config::from_json_value(serde_json::json!({ "spool": { "envelopes": { - "version": "experimental" + "version": "experimental", + "max_memory_percent": 1.0 } } })) .unwrap(), - ) - .unwrap() - .into() + ); + + let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); + + GuardedEnvelopeBuffer::from_config(&config, memory_checker) + .unwrap() + .into() } fn new_envelope() -> Box { diff --git a/relay-server/src/services/buffer/stack_provider/memory.rs b/relay-server/src/services/buffer/stack_provider/memory.rs index b3fe5c3bb38..66f367f38a9 100644 --- a/relay-server/src/services/buffer/stack_provider/memory.rs +++ b/relay-server/src/services/buffer/stack_provider/memory.rs @@ -1,9 +1,20 @@ use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack; -use crate::services::buffer::envelope_stack::StackProvider; +use crate::services::buffer::stack_provider::StackProvider; +use crate::utils::MemoryChecker; use crate::Envelope; #[derive(Debug)] -pub struct MemoryStackProvider; +pub struct MemoryStackProvider { + memory_checker: MemoryChecker, +} + +impl MemoryStackProvider { + /// Creates a new [`MemoryStackProvider`] with a given [`MemoryChecker`] that is used to + /// estimate the capacity. + pub fn new(memory_checker: MemoryChecker) -> Self { + Self { memory_checker } + } +} impl StackProvider for MemoryStackProvider { type Stack = MemoryEnvelopeStack; @@ -11,4 +22,8 @@ impl StackProvider for MemoryStackProvider { fn create_stack(&self, envelope: Box) -> Self::Stack { MemoryEnvelopeStack::new(envelope) } + + fn has_store_capacity(&self) -> bool { + self.memory_checker.check_memory().has_capacity() + } } diff --git a/relay-server/src/services/buffer/stack_provider/mod.rs b/relay-server/src/services/buffer/stack_provider/mod.rs index ae663f641db..79e36c43934 100644 --- a/relay-server/src/services/buffer/stack_provider/mod.rs +++ b/relay-server/src/services/buffer/stack_provider/mod.rs @@ -1,2 +1,17 @@ +use crate::{Envelope, EnvelopeStack}; + pub mod memory; pub mod sqlite; + +/// A provider of [`EnvelopeStack`] instances that is responsible for creating them. +pub trait StackProvider: std::fmt::Debug { + /// The implementation of [`EnvelopeStack`] that this manager creates. + type Stack: EnvelopeStack; + + /// Creates an [`EnvelopeStack`]. + fn create_stack(&self, envelope: Box) -> Self::Stack; + + /// Returns `true` if the store used by this [`StackProvider`] has space to add new + /// stacks or items to the stacks. + fn has_store_capacity(&self) -> bool; +} diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs index 9716585606a..a2d49c62d1e 100644 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -1,9 +1,9 @@ use relay_config::Config; -use crate::services::buffer::envelope_stack::StackProvider; -use crate::services::buffer::sqlite_envelope_store::{ +use crate::services::buffer::envelope_store::sqlite::{ SqliteEnvelopeStore, SqliteEnvelopeStoreError, }; +use crate::services::buffer::stack_provider::StackProvider; use crate::{Envelope, SqliteEnvelopeStack}; #[derive(Debug)] @@ -11,17 +11,19 @@ pub struct SqliteStackProvider { envelope_store: SqliteEnvelopeStore, disk_batch_size: usize, max_batches: usize, + max_disk_size: usize, } #[warn(dead_code)] impl SqliteStackProvider { - /// Creates a new [`SqliteStackProvider`] from the provided path to the SQLite database file. + /// Creates a new [`SqliteStackProvider`] from the provided [`Config`]. pub async fn new(config: &Config) -> Result { let envelope_store = SqliteEnvelopeStore::prepare(config).await?; Ok(Self { envelope_store, disk_batch_size: config.spool_envelopes_stack_disk_batch_size(), max_batches: config.spool_envelopes_stack_max_batches(), + max_disk_size: config.spool_envelopes_max_disk_size(), }) } } @@ -41,4 +43,8 @@ impl StackProvider for SqliteStackProvider { sampling_key, ) } + + fn has_store_capacity(&self) -> bool { + (self.envelope_store.usage() as usize) < self.max_disk_size + } } diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index c7d7331445b..c7739992ccf 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -1616,7 +1616,8 @@ mod tests { .unwrap() .into(); let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); - let envelope_buffer = GuardedEnvelopeBuffer::from_config(&config).map(Arc::new); + let envelope_buffer = + GuardedEnvelopeBuffer::from_config(&config, memory_checker.clone()).map(Arc::new); let buffer_services = spooler::Services { outcome_aggregator: services.outcome_aggregator.clone(), project_cache: services.project_cache.clone(), diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 1113d528fc8..00b0773df75 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -34,6 +34,8 @@ pub enum RelayGauges { /// /// Per combination of `(own_key, sampling_key)`, a new stack is created. BufferStackCount, + /// The used disk for the buffer. + BufferDiskUsed, /// The currently used memory by the entire system. /// /// Relay uses the same value for its memory health check. @@ -60,6 +62,7 @@ impl GaugeMetric for RelayGauges { RelayGauges::BufferPeriodicUnspool => "buffer.unspool.periodic", RelayGauges::BufferPushInFlight => "buffer.push_inflight", RelayGauges::BufferStackCount => "buffer.stack_count", + RelayGauges::BufferDiskUsed => "buffer.disk_used", RelayGauges::SystemMemoryUsed => "health.system_memory.used", RelayGauges::SystemMemoryTotal => "health.system_memory.total", #[cfg(feature = "processing")] @@ -600,6 +603,12 @@ pub enum RelayCounters { /// - `state_out`: The new state. `memory`, `memory_file_standby`, or `disk`. /// - `reason`: Why a transition was made (or not made). BufferStateTransition, + /// Number of times the capacity is of the buffer is checked. + /// + /// This metric is tagged with: + /// - `lock_acquired`: Whether the capacity check was done by acquiring the lock or using the + /// old value. + BufferCapacityCheck, /// /// Number of outcomes and reasons for rejected Envelopes. /// @@ -814,6 +823,7 @@ impl CounterMetric for RelayCounters { RelayCounters::BufferEnvelopesWritten => "buffer.envelopes_written", RelayCounters::BufferEnvelopesRead => "buffer.envelopes_read", RelayCounters::BufferStateTransition => "buffer.state.transition", + RelayCounters::BufferCapacityCheck => "buffer.capacity_check", RelayCounters::Outcomes => "events.outcomes", RelayCounters::ProjectStateGet => "project_state.get", RelayCounters::ProjectStateRequest => "project_state.request", diff --git a/relay-server/src/utils/memory.rs b/relay-server/src/utils/memory.rs index d22b576b6f9..4c4c53a1ed1 100644 --- a/relay-server/src/utils/memory.rs +++ b/relay-server/src/utils/memory.rs @@ -57,7 +57,7 @@ struct Inner { refresh_frequency_ms: u64, } -/// Wrapper around [`Inner`] which hides the [`Arc`] and exposes utils method to make working with +/// Wrapper which hides the [`Arc`] and exposes utils method to make working with /// [`MemoryStat`] as opaque as possible. #[derive(Clone)] pub struct MemoryStat(Arc); @@ -77,7 +77,7 @@ impl MemoryStat { })) } - /// Returns a copy of the most up to date [`Memory`] data. + /// Returns a copy of the most up-to-date memory data. pub fn memory(&self) -> Memory { self.try_update(); **self.0.memory.load() @@ -177,7 +177,7 @@ impl MemoryCheck { /// decides how memory readings are interpreted. #[derive(Clone, Debug)] pub struct MemoryChecker { - pub memory_stat: MemoryStat, + memory_stat: MemoryStat, config: Arc, } diff --git a/relay-ua/uap-core b/relay-ua/uap-core index d668d6c6157..bbd43aed9a6 160000 --- a/relay-ua/uap-core +++ b/relay-ua/uap-core @@ -1 +1 @@ -Subproject commit d668d6c6157db7737edfc0280adc6610c1b88029 +Subproject commit bbd43aed9a623486191a33c3af9e463e89c85f7a