From 9ecb139d7544183d409f1114d01a67cfbc9c0776 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 12 Nov 2024 14:02:23 +0100 Subject: [PATCH 1/6] feat(spooler): Add CachingEnvelopeStack --- .../services/buffer/envelope_stack/caching.rs | 101 ++++++++++++++++++ .../src/services/buffer/envelope_stack/mod.rs | 1 + 2 files changed, 102 insertions(+) create mode 100644 relay-server/src/services/buffer/envelope_stack/caching.rs diff --git a/relay-server/src/services/buffer/envelope_stack/caching.rs b/relay-server/src/services/buffer/envelope_stack/caching.rs new file mode 100644 index 0000000000..ecbebba529 --- /dev/null +++ b/relay-server/src/services/buffer/envelope_stack/caching.rs @@ -0,0 +1,101 @@ +use chrono::{DateTime, Utc}; + +use super::EnvelopeStack; +use crate::envelope::Envelope; + +/// An envelope stack implementation that caches one element in memory and delegates +/// to another envelope stack for additional storage. +#[derive(Debug)] +pub struct CachingEnvelopeStack { + /// The underlying envelope stack + inner: S, + /// The cached envelope (if any) + cached: Option>, +} + +impl CachingEnvelopeStack +where + S: EnvelopeStack, +{ + /// Creates a new `CachingEnvelopeStack` wrapping the provided envelope stack + pub fn new(inner: S) -> Self { + Self { + inner, + cached: None, + } + } +} + +impl EnvelopeStack for CachingEnvelopeStack +where + S: EnvelopeStack, +{ + type Error = S::Error; + + async fn push(&mut self, envelope: Box) -> Result<(), Self::Error> { + if self.cached.is_none() { + // If we don't have a cached envelope, store this one in cache + self.cached = Some(envelope); + Ok(()) + } else { + // If we already have a cached envelope, push the current cached one + // to the inner stack and keep the new one in cache + let cached = self.cached.take().unwrap(); + self.inner.push(cached).await?; + self.cached = Some(envelope); + Ok(()) + } + } + + async fn peek(&mut self) -> Result>, Self::Error> { + if let Some(ref envelope) = self.cached { + Ok(Some(envelope.received_at())) + } else { + self.inner.peek().await + } + } + + async fn pop(&mut self) -> Result>, Self::Error> { + if let Some(envelope) = self.cached.take() { + Ok(Some(envelope)) + } else { + self.inner.pop().await + } + } + + async fn flush(mut self) { + if let Some(envelope) = self.cached { + let _ = self.inner.push(envelope).await; + } + self.inner.flush().await; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack; + use crate::services::buffer::testutils::utils::mock_envelope; + + #[tokio::test] + async fn test_caching_stack() { + let inner = MemoryEnvelopeStack::new(); + let mut stack = CachingEnvelopeStack::new(inner); + + // Create test envelopes with different timestamps + let env1 = mock_envelope(Utc::now()); + let env2 = mock_envelope(Utc::now()); + let env3 = mock_envelope(Utc::now()); + + // Push envelopes + stack.push(env1).await.unwrap(); + stack.push(env2).await.unwrap(); + stack.push(env3).await.unwrap(); + + // Pop them back + assert!(stack.pop().await.unwrap().is_some()); // Should come from cache + assert!(stack.pop().await.unwrap().is_some()); // Should come from inner + assert!(stack.pop().await.unwrap().is_some()); // Should come from inner + assert!(stack.pop().await.unwrap().is_none()); // Should be empty + } +} diff --git a/relay-server/src/services/buffer/envelope_stack/mod.rs b/relay-server/src/services/buffer/envelope_stack/mod.rs index a946e2dc6e..686e60fcc8 100644 --- a/relay-server/src/services/buffer/envelope_stack/mod.rs +++ b/relay-server/src/services/buffer/envelope_stack/mod.rs @@ -4,6 +4,7 @@ use chrono::{DateTime, Utc}; use crate::envelope::Envelope; +pub mod caching; pub mod memory; pub mod sqlite; From 3512f8e342b46a96fca3359f1ec74a62cf805e4b Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 12 Nov 2024 14:09:31 +0100 Subject: [PATCH 2/6] Fix --- .../src/services/buffer/stack_provider/sqlite.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs index 8fde4d79f5..19d7b6d867 100644 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -3,6 +3,7 @@ use std::error::Error; use relay_config::Config; use crate::services::buffer::common::ProjectKeyPair; +use crate::services::buffer::envelope_stack::caching::CachingEnvelopeStack; use crate::services::buffer::envelope_store::sqlite::{ SqliteEnvelopeStore, SqliteEnvelopeStoreError, }; @@ -38,7 +39,7 @@ impl SqliteStackProvider { } impl StackProvider for SqliteStackProvider { - type Stack = SqliteEnvelopeStack; + type Stack = CachingEnvelopeStack; async fn initialize(&self) -> InitializationState { match self.envelope_store.project_key_pairs().await { @@ -58,7 +59,7 @@ impl StackProvider for SqliteStackProvider { stack_creation_type: StackCreationType, project_key_pair: ProjectKeyPair, ) -> Self::Stack { - SqliteEnvelopeStack::new( + let inner = SqliteEnvelopeStack::new( self.envelope_store.clone(), self.batch_size_bytes, project_key_pair.own_key, @@ -69,7 +70,9 @@ impl StackProvider for SqliteStackProvider { // it was empty, or we never had data on disk for that stack, so we assume by default // that there is no need to check disk until some data is spooled. Self::assume_data_on_disk(stack_creation_type), - ) + ); + + CachingEnvelopeStack::new(inner) } fn has_store_capacity(&self) -> bool { From 469afadebc820c0a669421b8ef0b6ad247d31f6f Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 12 Nov 2024 14:11:43 +0100 Subject: [PATCH 3/6] Fix --- relay-server/src/services/buffer/envelope_stack/caching.rs | 2 +- relay-server/src/services/buffer/envelope_stack/memory.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_stack/caching.rs b/relay-server/src/services/buffer/envelope_stack/caching.rs index ecbebba529..abaa51ce18 100644 --- a/relay-server/src/services/buffer/envelope_stack/caching.rs +++ b/relay-server/src/services/buffer/envelope_stack/caching.rs @@ -17,7 +17,7 @@ impl CachingEnvelopeStack where S: EnvelopeStack, { - /// Creates a new `CachingEnvelopeStack` wrapping the provided envelope stack + /// Creates a new [`CachingEnvelopeStack`] wrapping the provided envelope stack pub fn new(inner: S) -> Self { Self { inner, diff --git a/relay-server/src/services/buffer/envelope_stack/memory.rs b/relay-server/src/services/buffer/envelope_stack/memory.rs index 66d8f4249b..28add148f5 100644 --- a/relay-server/src/services/buffer/envelope_stack/memory.rs +++ b/relay-server/src/services/buffer/envelope_stack/memory.rs @@ -24,7 +24,7 @@ impl EnvelopeStack for MemoryEnvelopeStack { } async fn peek(&mut self) -> Result>, Self::Error> { - Ok(self.0.last().map(|e| e.meta().received_at())) + Ok(self.0.last().map(|e| e.received_at())) } async fn pop(&mut self) -> Result>, Self::Error> { From 0962e92ef33a8bf1ae6c06e357f44c8eabf5833b Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 12 Nov 2024 14:50:02 +0100 Subject: [PATCH 4/6] Fix --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 801ce68d3f..ff27087221 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ - Add additional fields to the `Event` `Getter`. ([#4238](https://github.com/getsentry/relay/pull/4238)) - Replace u64 with `OrganizationId` new-type struct for organization id. ([#4159](https://github.com/getsentry/relay/pull/4159)) - Add computed contexts for `os`, `browser` and `runtime`. ([#4239](https://github.com/getsentry/relay/pull/4239)) +- Add `CachingEnvelopeStack` strategy to the buffer. ([#4242](https://github.com/getsentry/relay/pull/4242)) ## 24.10.0 From 32d4f6de4f198084b40f9471b55e58a1b4363d26 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 13 Nov 2024 08:59:30 +0100 Subject: [PATCH 5/6] Improve --- .../services/buffer/envelope_stack/caching.rs | 51 ++++++++++--------- .../src/services/buffer/envelope_stack/mod.rs | 2 +- 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_stack/caching.rs b/relay-server/src/services/buffer/envelope_stack/caching.rs index abaa51ce18..06ce0dabdd 100644 --- a/relay-server/src/services/buffer/envelope_stack/caching.rs +++ b/relay-server/src/services/buffer/envelope_stack/caching.rs @@ -33,18 +33,12 @@ where type Error = S::Error; async fn push(&mut self, envelope: Box) -> Result<(), Self::Error> { - if self.cached.is_none() { - // If we don't have a cached envelope, store this one in cache - self.cached = Some(envelope); - Ok(()) - } else { - // If we already have a cached envelope, push the current cached one - // to the inner stack and keep the new one in cache - let cached = self.cached.take().unwrap(); + if let Some(cached) = self.cached.take() { self.inner.push(cached).await?; - self.cached = Some(envelope); - Ok(()) } + self.cached = Some(envelope); + + Ok(()) } async fn peek(&mut self) -> Result>, Self::Error> { @@ -65,7 +59,11 @@ where async fn flush(mut self) { if let Some(envelope) = self.cached { - let _ = self.inner.push(envelope).await; + if let Err(_) = self.inner.push(envelope).await { + relay_log::error!( + "error while pushing the cached envelope in the inner stack during flushing", + ); + } } self.inner.flush().await; } @@ -83,19 +81,22 @@ mod tests { let mut stack = CachingEnvelopeStack::new(inner); // Create test envelopes with different timestamps - let env1 = mock_envelope(Utc::now()); - let env2 = mock_envelope(Utc::now()); - let env3 = mock_envelope(Utc::now()); - - // Push envelopes - stack.push(env1).await.unwrap(); - stack.push(env2).await.unwrap(); - stack.push(env3).await.unwrap(); - - // Pop them back - assert!(stack.pop().await.unwrap().is_some()); // Should come from cache - assert!(stack.pop().await.unwrap().is_some()); // Should come from inner - assert!(stack.pop().await.unwrap().is_some()); // Should come from inner - assert!(stack.pop().await.unwrap().is_none()); // Should be empty + let envelope_1 = mock_envelope(Utc::now()); + let envelope_2 = mock_envelope(Utc::now()); + + // Push 2 envelopes + stack.push(envelope_1).await.unwrap(); + stack.push(envelope_2).await.unwrap(); + + // We pop the cached element. + assert!(stack.pop().await.unwrap().is_some()); + + // We peek the stack expecting it peeks the inner one. + assert!(stack.peek().await.unwrap().is_some()); + + // We pop the element and then check if the stack is empty. + assert!(stack.pop().await.unwrap().is_some()); + assert!(stack.peek().await.unwrap().is_none()); + assert!(stack.pop().await.unwrap().is_none()); } } diff --git a/relay-server/src/services/buffer/envelope_stack/mod.rs b/relay-server/src/services/buffer/envelope_stack/mod.rs index 686e60fcc8..8bf1b7253c 100644 --- a/relay-server/src/services/buffer/envelope_stack/mod.rs +++ b/relay-server/src/services/buffer/envelope_stack/mod.rs @@ -12,7 +12,7 @@ pub mod sqlite; pub trait EnvelopeStack: Send + std::fmt::Debug { /// The error type that is returned when an error is encountered during reading or writing the /// [`EnvelopeStack`]. - type Error: std::fmt::Debug; + type Error: std::fmt::Debug + std::error::Error; /// Pushes an [`Envelope`] on top of the stack. fn push(&mut self, envelope: Box) -> impl Future>; From db6d82bfd4accd0fea12dc4fab403831b24257c7 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 13 Nov 2024 09:02:20 +0100 Subject: [PATCH 6/6] Improve --- relay-server/src/services/buffer/envelope_stack/caching.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/services/buffer/envelope_stack/caching.rs b/relay-server/src/services/buffer/envelope_stack/caching.rs index 06ce0dabdd..68d83d6beb 100644 --- a/relay-server/src/services/buffer/envelope_stack/caching.rs +++ b/relay-server/src/services/buffer/envelope_stack/caching.rs @@ -59,7 +59,7 @@ where async fn flush(mut self) { if let Some(envelope) = self.cached { - if let Err(_) = self.inner.push(envelope).await { + if self.inner.push(envelope).await.is_err() { relay_log::error!( "error while pushing the cached envelope in the inner stack during flushing", );