Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spooler): Add CachingEnvelopeStack #4242

Merged
merged 7 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions relay-server/src/services/buffer/envelope_stack/caching.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use chrono::{DateTime, Utc};

use super::EnvelopeStack;
use crate::envelope::Envelope;

/// An envelope stack implementation that caches one element in memory and delegates
/// to another envelope stack for additional storage.
#[derive(Debug)]
pub struct CachingEnvelopeStack<S> {
/// The underlying envelope stack
inner: S,
/// The cached envelope (if any)
cached: Option<Box<Envelope>>,
}

impl<S> CachingEnvelopeStack<S>
where
S: EnvelopeStack,
{
/// Creates a new [`CachingEnvelopeStack`] wrapping the provided envelope stack
pub fn new(inner: S) -> Self {
Self {
inner,
cached: None,
}
}
}

impl<S> EnvelopeStack for CachingEnvelopeStack<S>
where
S: EnvelopeStack,
{
type Error = S::Error;

async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), Self::Error> {
if self.cached.is_none() {
// If we don't have a cached envelope, store this one in cache
self.cached = Some(envelope);
Ok(())
} else {
// If we already have a cached envelope, push the current cached one
// to the inner stack and keep the new one in cache
let cached = self.cached.take().unwrap();
self.inner.push(cached).await?;
self.cached = Some(envelope);
Ok(())
}
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved
}

async fn peek(&mut self) -> Result<Option<DateTime<Utc>>, Self::Error> {
if let Some(ref envelope) = self.cached {
Ok(Some(envelope.received_at()))
} else {
self.inner.peek().await
}
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved
}

async fn pop(&mut self) -> Result<Option<Box<Envelope>>, Self::Error> {
if let Some(envelope) = self.cached.take() {
Ok(Some(envelope))
} else {
self.inner.pop().await
}
}

async fn flush(mut self) {
if let Some(envelope) = self.cached {
let _ = self.inner.push(envelope).await;
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved
}
self.inner.flush().await;
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack;
use crate::services::buffer::testutils::utils::mock_envelope;

#[tokio::test]
async fn test_caching_stack() {
let inner = MemoryEnvelopeStack::new();
let mut stack = CachingEnvelopeStack::new(inner);

// Create test envelopes with different timestamps
let env1 = mock_envelope(Utc::now());
let env2 = mock_envelope(Utc::now());
let env3 = mock_envelope(Utc::now());

// Push envelopes
stack.push(env1).await.unwrap();
stack.push(env2).await.unwrap();
stack.push(env3).await.unwrap();
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved

// Pop them back
assert!(stack.pop().await.unwrap().is_some()); // Should come from cache
assert!(stack.pop().await.unwrap().is_some()); // Should come from inner
assert!(stack.pop().await.unwrap().is_some()); // Should come from inner
assert!(stack.pop().await.unwrap().is_none()); // Should be empty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl EnvelopeStack for MemoryEnvelopeStack {
}

async fn peek(&mut self) -> Result<Option<DateTime<Utc>>, Self::Error> {
Ok(self.0.last().map(|e| e.meta().received_at()))
Ok(self.0.last().map(|e| e.received_at()))
}

async fn pop(&mut self) -> Result<Option<Box<Envelope>>, Self::Error> {
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/services/buffer/envelope_stack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use chrono::{DateTime, Utc};

use crate::envelope::Envelope;

pub mod caching;
pub mod memory;
pub mod sqlite;

Expand Down
9 changes: 6 additions & 3 deletions relay-server/src/services/buffer/stack_provider/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::error::Error;
use relay_config::Config;

use crate::services::buffer::common::ProjectKeyPair;
use crate::services::buffer::envelope_stack::caching::CachingEnvelopeStack;
use crate::services::buffer::envelope_store::sqlite::{
SqliteEnvelopeStore, SqliteEnvelopeStoreError,
};
Expand Down Expand Up @@ -38,7 +39,7 @@ impl SqliteStackProvider {
}

impl StackProvider for SqliteStackProvider {
type Stack = SqliteEnvelopeStack;
type Stack = CachingEnvelopeStack<SqliteEnvelopeStack>;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we want to directly use it with the SqliteStackProvider.


async fn initialize(&self) -> InitializationState {
match self.envelope_store.project_key_pairs().await {
Expand All @@ -58,7 +59,7 @@ impl StackProvider for SqliteStackProvider {
stack_creation_type: StackCreationType,
project_key_pair: ProjectKeyPair,
) -> Self::Stack {
SqliteEnvelopeStack::new(
let inner = SqliteEnvelopeStack::new(
self.envelope_store.clone(),
self.batch_size_bytes,
project_key_pair.own_key,
Expand All @@ -69,7 +70,9 @@ impl StackProvider for SqliteStackProvider {
// it was empty, or we never had data on disk for that stack, so we assume by default
// that there is no need to check disk until some data is spooled.
Self::assume_data_on_disk(stack_creation_type),
)
);

CachingEnvelopeStack::new(inner)
}

fn has_store_capacity(&self) -> bool {
Expand Down
Loading