Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spooler): Add CachingEnvelopeStack #4242

Merged
merged 7 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
- Add additional fields to the `Event` `Getter`. ([#4238](https://github.com/getsentry/relay/pull/4238))
- Replace u64 with `OrganizationId` new-type struct for organization id. ([#4159](https://github.com/getsentry/relay/pull/4159))
- Add computed contexts for `os`, `browser` and `runtime`. ([#4239](https://github.com/getsentry/relay/pull/4239))
- Add `CachingEnvelopeStack` strategy to the buffer. ([#4242](https://github.com/getsentry/relay/pull/4242))

## 24.10.0

Expand Down
102 changes: 102 additions & 0 deletions relay-server/src/services/buffer/envelope_stack/caching.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use chrono::{DateTime, Utc};

use super::EnvelopeStack;
use crate::envelope::Envelope;

/// An envelope stack implementation that caches one element in memory and delegates
/// to another envelope stack for additional storage.
#[derive(Debug)]
pub struct CachingEnvelopeStack<S> {
/// The underlying envelope stack
inner: S,
/// The cached envelope (if any)
cached: Option<Box<Envelope>>,
}

impl<S> CachingEnvelopeStack<S>
where
S: EnvelopeStack,
{
/// Creates a new [`CachingEnvelopeStack`] wrapping the provided envelope stack
pub fn new(inner: S) -> Self {
Self {
inner,
cached: None,
}
}
}

impl<S> EnvelopeStack for CachingEnvelopeStack<S>
where
S: EnvelopeStack,
{
type Error = S::Error;

async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), Self::Error> {
if let Some(cached) = self.cached.take() {
self.inner.push(cached).await?;
}
self.cached = Some(envelope);

Ok(())
}

async fn peek(&mut self) -> Result<Option<DateTime<Utc>>, Self::Error> {
if let Some(ref envelope) = self.cached {
Ok(Some(envelope.received_at()))
} else {
self.inner.peek().await
}
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved
}

async fn pop(&mut self) -> Result<Option<Box<Envelope>>, Self::Error> {
if let Some(envelope) = self.cached.take() {
Ok(Some(envelope))
} else {
self.inner.pop().await
}
}

async fn flush(mut self) {
if let Some(envelope) = self.cached {
if self.inner.push(envelope).await.is_err() {
relay_log::error!(
"error while pushing the cached envelope in the inner stack during flushing",
);
}
}
self.inner.flush().await;
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack;
use crate::services::buffer::testutils::utils::mock_envelope;

#[tokio::test]
async fn test_caching_stack() {
let inner = MemoryEnvelopeStack::new();
let mut stack = CachingEnvelopeStack::new(inner);

// Create test envelopes with different timestamps
let envelope_1 = mock_envelope(Utc::now());
let envelope_2 = mock_envelope(Utc::now());

// Push 2 envelopes
stack.push(envelope_1).await.unwrap();
stack.push(envelope_2).await.unwrap();

// We pop the cached element.
assert!(stack.pop().await.unwrap().is_some());

// We peek the stack expecting it peeks the inner one.
assert!(stack.peek().await.unwrap().is_some());

// We pop the element and then check if the stack is empty.
assert!(stack.pop().await.unwrap().is_some());
assert!(stack.peek().await.unwrap().is_none());
assert!(stack.pop().await.unwrap().is_none());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl EnvelopeStack for MemoryEnvelopeStack {
}

async fn peek(&mut self) -> Result<Option<DateTime<Utc>>, Self::Error> {
Ok(self.0.last().map(|e| e.meta().received_at()))
Ok(self.0.last().map(|e| e.received_at()))
}

async fn pop(&mut self) -> Result<Option<Box<Envelope>>, Self::Error> {
Expand Down
3 changes: 2 additions & 1 deletion relay-server/src/services/buffer/envelope_stack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ use chrono::{DateTime, Utc};

use crate::envelope::Envelope;

pub mod caching;
pub mod memory;
pub mod sqlite;

/// A stack-like data structure that holds [`Envelope`]s.
pub trait EnvelopeStack: Send + std::fmt::Debug {
/// The error type that is returned when an error is encountered during reading or writing the
/// [`EnvelopeStack`].
type Error: std::fmt::Debug;
type Error: std::fmt::Debug + std::error::Error;

/// Pushes an [`Envelope`] on top of the stack.
fn push(&mut self, envelope: Box<Envelope>) -> impl Future<Output = Result<(), Self::Error>>;
Expand Down
9 changes: 6 additions & 3 deletions relay-server/src/services/buffer/stack_provider/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::error::Error;
use relay_config::Config;

use crate::services::buffer::common::ProjectKeyPair;
use crate::services::buffer::envelope_stack::caching::CachingEnvelopeStack;
use crate::services::buffer::envelope_store::sqlite::{
SqliteEnvelopeStore, SqliteEnvelopeStoreError,
};
Expand Down Expand Up @@ -38,7 +39,7 @@ impl SqliteStackProvider {
}

impl StackProvider for SqliteStackProvider {
type Stack = SqliteEnvelopeStack;
type Stack = CachingEnvelopeStack<SqliteEnvelopeStack>;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we want to directly use it with the SqliteStackProvider.


async fn initialize(&self) -> InitializationState {
match self.envelope_store.project_key_pairs().await {
Expand All @@ -58,7 +59,7 @@ impl StackProvider for SqliteStackProvider {
stack_creation_type: StackCreationType,
project_key_pair: ProjectKeyPair,
) -> Self::Stack {
SqliteEnvelopeStack::new(
let inner = SqliteEnvelopeStack::new(
self.envelope_store.clone(),
self.batch_size_bytes,
project_key_pair.own_key,
Expand All @@ -69,7 +70,9 @@ impl StackProvider for SqliteStackProvider {
// it was empty, or we never had data on disk for that stack, so we assume by default
// that there is no need to check disk until some data is spooled.
Self::assume_data_on_disk(stack_creation_type),
)
);

CachingEnvelopeStack::new(inner)
}

fn has_store_capacity(&self) -> bool {
Expand Down
Loading