Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spooler): Add EnvelopeStack based on SQLite #3855

Merged
merged 24 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

- Allow metrics summaries with only `count` (for sets). ([#3864](https://github.com/getsentry/relay/pull/3864))

**Internal**:

- Add `EnvelopeStack` and `SQLiteEnvelopeStack` to manage envelopes on disk. ([#3855](https://github.com/getsentry/relay/pull/3855))

## 24.7.1

**Bug Fixes**:
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions migrations/20240724144200_change_index.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP INDEX IF EXISTS project_keys;

CREATE INDEX IF NOT EXISTS project_keys_received_at ON envelopes (own_key, sampling_key, received_at);
5 changes: 5 additions & 0 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,15 @@ axum-extra = { workspace = true, features = ["protobuf"] }
semver = { workspace = true }

[dev-dependencies]
criterion = { workspace = true }
tokio = { workspace = true, features = ['test-util'] }
insta = { workspace = true }
relay-event-schema = { workspace = true, features = ["jsonschema"] }
relay-protocol = { workspace = true, features = ["test"] }
relay-test = { workspace = true }
similar-asserts = { workspace = true }
tempfile = { workspace = true }

[[bench]]
name = "benches"
harness = false
171 changes: 171 additions & 0 deletions relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
use sqlx::{Pool, Sqlite};
use std::path::PathBuf;
use std::time::Duration;
use tempfile::TempDir;
use tokio::runtime::Runtime;

use relay_base_schema::project::ProjectKey;
use relay_server::{Envelope, EnvelopeStack, SQLiteEnvelopeStack};

fn setup_db(path: &PathBuf) -> Pool<Sqlite> {
let options = SqliteConnectOptions::new()
.filename(path)
.journal_mode(SqliteJournalMode::Wal)
.create_if_missing(true);

let runtime = Runtime::new().unwrap();
runtime.block_on(async {
let db = SqlitePoolOptions::new()
.connect_with(options)
.await
.unwrap();

sqlx::migrate!("../migrations").run(&db).await.unwrap();

db
})
}

async fn reset_db(db: Pool<Sqlite>) {
sqlx::query("DELETE FROM envelopes")
.execute(&db)
.await
.unwrap();
}

fn mock_envelope() -> Box<Envelope> {
let bytes = Bytes::from(
"\
{\"event_id\":\"9ec79c33ec9942ab8353589fcb2e04dc\",\"dsn\":\"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42\"}\n\
{\"type\":\"attachment\"}\n\
helloworld\n\
",
);
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved

Envelope::parse_bytes(bytes).unwrap()
}

fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("test.db");
let db = setup_db(&db_path);

let runtime = Runtime::new().unwrap();

let mut group = c.benchmark_group("sqlite_envelope_stack");
group.measurement_time(Duration::from_secs(60));

let disk_batch_size = 1000;
for size in [1_000, 10_000, 100_000].iter() {
group.throughput(Throughput::Elements(*size as u64));

// Benchmark push operations
group.bench_with_input(BenchmarkId::new("push", size), size, |b, &size| {
b.iter_with_setup(
|| {
runtime.block_on(async {
reset_db(db.clone()).await;
});

let stack = SQLiteEnvelopeStack::new(
db.clone(),
disk_batch_size,
2,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
);

let mut envelopes = Vec::with_capacity(size);
for _ in 0..size {
envelopes.push(mock_envelope());
}

(stack, envelopes)
},
|(mut stack, envelopes)| {
runtime.block_on(async {
for envelope in envelopes {
stack.push(envelope).await.unwrap();
}
});
},
);
});

// Benchmark pop operations
group.bench_with_input(BenchmarkId::new("pop", size), size, |b, &size| {
b.iter_with_setup(
|| {
runtime.block_on(async {
reset_db(db.clone()).await;

let mut stack = SQLiteEnvelopeStack::new(
db.clone(),
disk_batch_size,
2,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
);

// Pre-fill the stack
for _ in 0..size {
let envelope = mock_envelope();
stack.push(envelope).await.unwrap();
}

stack
})
},
|mut stack| {
runtime.block_on(async {
// Benchmark popping
for _ in 0..size {
stack.pop().await.unwrap();
}
});
},
);
});

// Benchmark mixed push and pop operations
group.bench_with_input(BenchmarkId::new("mixed", size), size, |b, &size| {
b.iter_with_setup(
|| {
runtime.block_on(async {
reset_db(db.clone()).await;
});

SQLiteEnvelopeStack::new(
db.clone(),
disk_batch_size,
2,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
)
},
|mut stack| {
runtime.block_on(async {
for _ in 0..size {
if rand::random::<bool>() {
let envelope = mock_envelope();
stack.push(envelope).await.unwrap();
} else if stack.pop().await.is_err() {
// If pop fails (empty stack), push instead
let envelope = mock_envelope();
stack.push(envelope).await.unwrap();
}
}
});
},
);
});
}

group.finish();
}

criterion_group!(benches, benchmark_sqlite_envelope_stack);
criterion_main!(benches);
1 change: 1 addition & 0 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,7 @@ impl EnvelopeHeaders<PartialMeta> {
}
}

#[doc(hidden)]
#[derive(Clone, Debug)]
pub struct Envelope {
headers: EnvelopeHeaders,
Expand Down
4 changes: 4 additions & 0 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ mod statsd;
mod utils;

pub use self::services::spooler::spool_utils;
// Public just for benchmarks.
pub use self::envelope::Envelope;
pub use self::services::spooler::envelope_stack::sqlite::SQLiteEnvelopeStack;
pub use self::services::spooler::envelope_stack::EnvelopeStack;

#[cfg(test)]
mod testutils;
Expand Down
27 changes: 27 additions & 0 deletions relay-server/src/services/spooler/envelope_stack/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::envelope::Envelope;
use std::future::Future;

pub mod sqlite;

/// A stack-like data structure that holds [`Envelope`]s.
pub trait EnvelopeStack {
/// The error type that is returned when an error is encountered during reading or writing the
/// [`EnvelopeStack`].
type Error;

/// Pushes an [`Envelope`] on top of the stack.
#[allow(dead_code)]
fn push(&mut self, envelope: Box<Envelope>) -> impl Future<Output = Result<(), Self::Error>>;

/// Peeks the [`Envelope`] on top of the stack.
///
/// If the stack is empty, an error is returned.
#[allow(dead_code)]
fn peek(&mut self) -> impl Future<Output = Result<&Box<Envelope>, Self::Error>>;

/// Pops the [`Envelope`] on top of the stack.
///
/// If the stack is empty, an error is returned.
#[allow(dead_code)]
fn pop(&mut self) -> impl Future<Output = Result<Box<Envelope>, Self::Error>>;
}
Loading
Loading