Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spooler): Add capacity check for the envelope buffer #3925

Merged
merged 24 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
- Extract client sdk from transaction into profiles. ([#3915](https://github.com/getsentry/relay/pull/3915))
- Extract `user.geo.subregion` into span metrics/indexed. ([#3914](https://github.com/getsentry/relay/pull/3914))
- Add `last_peek` field to the `Priority` struct. ([#3922](https://github.com/getsentry/relay/pull/3922))
- Add `EnvelopeStore` trait and implement `DiskUsage` for tracking disk usage. ([#3925](https://github.com/getsentry/relay/pull/3925))

## 24.7.1

Expand Down
2 changes: 1 addition & 1 deletion relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ pub struct EnvelopeSpool {
min_connections: u32,
/// The maximum size of the buffer to keep, in bytes.
///
/// If not set the befault is 524288000 bytes (500MB).
/// If not set the default is 524288000 bytes (500MB).
#[serde(default = "spool_envelopes_max_disk_size")]
max_disk_size: ByteSize,
/// The maximum bytes to keep in the memory buffer before spooling envelopes to disk, in bytes.
Expand Down
23 changes: 19 additions & 4 deletions relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ use relay_config::Config;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
use sqlx::{Pool, Sqlite};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tempfile::TempDir;
use tokio::runtime::Runtime;

use relay_base_schema::project::ProjectKey;
use relay_server::{
Envelope, EnvelopeStack, PolymorphicEnvelopeBuffer, SqliteEnvelopeStack, SqliteEnvelopeStore,
Envelope, EnvelopeStack, MemoryChecker, MemoryStat, PolymorphicEnvelopeBuffer,
SqliteEnvelopeStack, SqliteEnvelopeStore,
};

fn setup_db(path: &PathBuf) -> Pool<Sqlite> {
Expand Down Expand Up @@ -72,7 +74,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("test.db");
let db = setup_db(&db_path);
let envelope_store = SqliteEnvelopeStore::new(db.clone());
let envelope_store = SqliteEnvelopeStore::new(db.clone(), 100);

let runtime = Runtime::new().unwrap();

Expand Down Expand Up @@ -221,6 +223,17 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
let num_projects = 100000;
let envelopes_per_project = 10;

let config: Arc<Config> = Config::from_json_value(serde_json::json!({
"spool": {
"health": {
"max_memory_percent": 1.0
}
}
}))
.unwrap()
.into();
let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone());

group.throughput(Throughput::Elements(
num_projects * envelopes_per_project as u64,
));
Expand All @@ -245,7 +258,8 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
},
|envelopes| {
runtime.block_on(async {
let mut buffer = PolymorphicEnvelopeBuffer::from_config(&Config::default());
let mut buffer =
PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone());
for envelope in envelopes.into_iter() {
buffer.push(envelope).await.unwrap();
}
Expand Down Expand Up @@ -274,7 +288,8 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
},
|envelopes| {
runtime.block_on(async {
let mut buffer = PolymorphicEnvelopeBuffer::from_config(&Config::default());
let mut buffer =
PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone());
let n = envelopes.len();
for envelope in envelopes.into_iter() {
let public_key = envelope.meta().public_key();
Expand Down
17 changes: 6 additions & 11 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub fn event_id_from_items(items: &Items) -> Result<Option<EventId>, BadStoreReq
///
/// Queueing can fail if the queue exceeds `envelope_buffer_size`. In this case, `Err` is
/// returned and the envelope is not queued.
fn queue_envelope(
async fn queue_envelope(
state: &ServiceState,
mut managed_envelope: ManagedEnvelope,
) -> Result<(), BadStoreRequest> {
Expand Down Expand Up @@ -307,12 +307,14 @@ fn queue_envelope(

match state.envelope_buffer() {
Some(buffer) => {
if !buffer.has_capacity().await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second review, I think we should make this method sync, because we should avoid blocking the request handler on disk read / writes under all circumstances. One option to make it sync would be to change GuardedEnvelopeBuffer::has_capacity to a try_lock, and cache the resulting bool:

        match self.inner.try_lock() {
            Ok(guard) => {
                let c = guard.backend.has_capacity();
                self.cached_capacity.store(c);
                c
            },
            Err(_) => self.cached_capacity.load()
        }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this could be an idea, given the intrinsic delay of memory and disk usage calculation, adding one by caching it should not be a big deal. My main worry with this is that we can't guarantee updated values in any way if lock contention is arbitrarily high.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds acceptable to me. We can emit a counter metric to track how often we emit cached vs fresh capacity. Once we transform this to a service, the cached capacity will be updated differently, anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I added the metric because I am interested.

return Err(BadStoreRequest::QueueFailed);
}

// NOTE: This assumes that a `prefetch` has already been scheduled for both the
// envelope's projects. See `handle_check_envelope`.
relay_log::trace!("Pushing envelope to V2 buffer");

// TODO: Sync-check whether the buffer has capacity.
// Otherwise return `QueueFailed`.
buffer.defer_push(envelope);
}
None => {
Expand Down Expand Up @@ -347,13 +349,6 @@ pub async fn handle_envelope(
)
}

// TODO(jjbayer): Move this check to spool impl
if state.memory_checker().check_memory().is_exceeded() {
// NOTE: Long-term, we should not reject the envelope here, but spool it to disk instead.
// This will be fixed with the new spool implementation.
return Err(BadStoreRequest::QueueFailed);
};

let mut managed_envelope = ManagedEnvelope::new(
envelope,
state.outcome_aggregator().clone(),
Expand Down Expand Up @@ -393,7 +388,7 @@ pub async fn handle_envelope(
return Err(BadStoreRequest::Overflow(offender));
}

queue_envelope(state, managed_envelope)?;
queue_envelope(state, managed_envelope).await?;

if checked.rate_limits.is_limited() {
// Even if some envelope items have been queued, there might be active rate limits on
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ pub use self::services::buffer::{
EnvelopeStack, PolymorphicEnvelopeBuffer, SqliteEnvelopeStack, SqliteEnvelopeStore,
}; // pub for benchmarks
pub use self::services::spooler::spool_utils;
pub use self::utils::{MemoryChecker, MemoryStat}; // pub for benchmarks

#[cfg(test)]
mod testutils;
Expand Down
6 changes: 5 additions & 1 deletion relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,11 @@ impl ServiceState {
upstream_relay.clone(),
global_config.clone(),
);
let envelope_buffer = GuardedEnvelopeBuffer::from_config(&config).map(Arc::new);
let envelope_buffer = GuardedEnvelopeBuffer::from_config(
&config,
MemoryChecker::new(memory_stat.clone(), config.clone()),
)
.map(Arc::new);
ProjectCacheService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
Expand Down
86 changes: 60 additions & 26 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ use relay_config::Config;

use crate::envelope::Envelope;
use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError;
use crate::services::buffer::envelope_stack::{EnvelopeStack, StackProvider};
use crate::services::buffer::sqlite_envelope_store::SqliteEnvelopeStoreError;
use crate::services::buffer::stack_provider::memory::MemoryStackProvider;
use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
use crate::services::buffer::envelope_stack::EnvelopeStack;
use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError;
use crate::services::buffer::stacks_manager::memory::MemoryStacksManager;
use crate::services::buffer::stacks_manager::sqlite::SqliteStacksManager;
use crate::services::buffer::stacks_manager::{Capacity, StacksManager};
use crate::statsd::{RelayCounters, RelayGauges};
use crate::utils::MemoryChecker;

/// Polymorphic envelope buffering interface.
///
Expand All @@ -25,20 +27,21 @@ use crate::statsd::{RelayCounters, RelayGauges};
#[allow(private_interfaces)]
pub enum PolymorphicEnvelopeBuffer {
/// An enveloper buffer that uses in-memory envelopes stacks.
InMemory(EnvelopeBuffer<MemoryStackProvider>),
InMemory(EnvelopeBuffer<MemoryStacksManager>),
/// An enveloper buffer that uses sqlite envelopes stacks.
#[allow(dead_code)]
Sqlite(EnvelopeBuffer<SqliteStackProvider>),
Sqlite(EnvelopeBuffer<SqliteStacksManager>),
}

impl PolymorphicEnvelopeBuffer {
/// Creates either a memory-based or a disk-based envelope buffer,
/// depending on the given configuration.
pub fn from_config(config: &Config) -> Self {
pub fn from_config(config: &Config, memory_checker: MemoryChecker) -> Self {
if config.spool_envelopes_path().is_some() {
panic!("Disk backend not yet supported for spool V2");
}
Self::InMemory(EnvelopeBuffer::<MemoryStackProvider>::new())

Self::InMemory(EnvelopeBuffer::<MemoryStacksManager>::new(memory_checker))
}

/// Adds an envelope to the buffer.
Expand Down Expand Up @@ -71,13 +74,21 @@ impl PolymorphicEnvelopeBuffer {

/// Marks a project as ready or not ready.
///
/// The buffer reprioritizes its envelopes based on this information.
/// The buffer re-prioritizes its envelopes based on this information.
pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
match self {
Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready),
Self::InMemory(buffer) => buffer.mark_ready(project, is_ready),
}
}

/// Returns `true` whether the buffer has capacity to accept new [`Envelope`]s.
pub fn has_capacity(&self) -> bool {
match self {
Self::Sqlite(buffer) => buffer.has_capacity(),
Self::InMemory(buffer) => buffer.has_capacity(),
}
}
}

/// Error that occurs while interacting with the envelope buffer.
Expand All @@ -86,6 +97,9 @@ pub enum EnvelopeBufferError {
#[error("sqlite")]
Sqlite(#[from] SqliteEnvelopeStackError),

#[error("failed to push envelope to the buffer")]
PushFailed,

#[error("impossible")]
Impossible(#[from] Infallible),
}
Expand All @@ -95,7 +109,7 @@ pub enum EnvelopeBufferError {
/// Envelope stacks are organized in a priority queue, and are reprioritized every time an envelope
/// is pushed, popped, or when a project becomes ready.
#[derive(Debug)]
struct EnvelopeBuffer<P: StackProvider> {
struct EnvelopeBuffer<P: StacksManager> {
/// The central priority queue.
priority_queue: priority_queue::PriorityQueue<QueueItem<StackKey, P::Stack>, Priority>,
/// A lookup table to find all stacks involving a project.
Expand All @@ -107,32 +121,32 @@ struct EnvelopeBuffer<P: StackProvider> {
stack_provider: P,
}

impl EnvelopeBuffer<MemoryStackProvider> {
/// Creates an empty buffer.
pub fn new() -> Self {
impl EnvelopeBuffer<MemoryStacksManager> {
/// Creates an empty memory-based buffer.
pub fn new(memory_checker: MemoryChecker) -> Self {
Self {
stacks_by_project: Default::default(),
priority_queue: Default::default(),
stack_provider: MemoryStackProvider,
stack_provider: MemoryStacksManager::new(memory_checker),
}
}
}

#[allow(dead_code)]
impl EnvelopeBuffer<SqliteStackProvider> {
/// Creates an empty buffer.
impl EnvelopeBuffer<SqliteStacksManager> {
/// Creates an empty sqlite-based buffer.
pub async fn new(config: &Config) -> Result<Self, SqliteEnvelopeStoreError> {
Ok(Self {
stacks_by_project: Default::default(),
priority_queue: Default::default(),
stack_provider: SqliteStackProvider::new(config).await?,
stack_provider: SqliteStacksManager::new(config).await?,
})
}
}

impl<P: StackProvider> EnvelopeBuffer<P>
impl<P: StacksManager> EnvelopeBuffer<P>
where
EnvelopeBufferError: std::convert::From<<P::Stack as EnvelopeStack>::Error>,
EnvelopeBufferError: From<<P::Stack as EnvelopeStack>::Error>,
{
/// Pushes an envelope to the appropriate envelope stack and reprioritizes the stack.
///
Expand Down Expand Up @@ -221,7 +235,7 @@ where
Ok(Some(envelope))
}

/// Reprioritizes all stacks that involve the given project key by setting it to "ready".
/// Re-prioritizes all stacks that involve the given project key by setting it to "ready".
pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
let mut changed = false;
if let Some(stack_keys) = self.stacks_by_project.get(project) {
Expand Down Expand Up @@ -272,6 +286,10 @@ where
);
}

pub fn has_capacity(&self) -> bool {
matches!(self.stack_provider.capacity(), Capacity::Free)
}

fn pop_stack(&mut self, stack_key: StackKey) {
for project_key in stack_key.iter() {
self.stacks_by_project
Expand Down Expand Up @@ -420,14 +438,16 @@ impl Readiness {
#[cfg(test)]
mod tests {
use std::str::FromStr;
use uuid::Uuid;
use std::sync::Arc;

use relay_common::Dsn;
use relay_event_schema::protocol::EventId;
use relay_sampling::DynamicSamplingContext;
use uuid::Uuid;

use crate::envelope::{Item, ItemType};
use crate::extractors::RequestMeta;
use crate::utils::MemoryStat;

use super::*;

Expand Down Expand Up @@ -461,9 +481,23 @@ mod tests {
envelope
}

fn mock_memory_checker() -> MemoryChecker {
let config: Arc<_> = Config::from_json_value(serde_json::json!({
"spool": {
"health": {
"max_memory_percent": 1.0
}
}
}))
.unwrap()
.into();

MemoryChecker::new(MemoryStat::default(), config.clone())
}

#[tokio::test]
async fn test_insert_pop() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new();
let mut buffer = EnvelopeBuffer::<MemoryStacksManager>::new(mock_memory_checker());

let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
Expand Down Expand Up @@ -547,7 +581,7 @@ mod tests {

#[tokio::test]
async fn test_project_internal_order() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new();
let mut buffer = EnvelopeBuffer::<MemoryStacksManager>::new(mock_memory_checker());

let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();

Expand All @@ -574,7 +608,7 @@ mod tests {

#[tokio::test]
async fn test_sampling_projects() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new();
let mut buffer = EnvelopeBuffer::<MemoryStacksManager>::new(mock_memory_checker());

let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
Expand Down Expand Up @@ -652,7 +686,7 @@ mod tests {

assert_ne!(stack_key1, stack_key2);

let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new();
let mut buffer = EnvelopeBuffer::<MemoryStacksManager>::new(mock_memory_checker());
buffer
.push(new_envelope(project_key1, Some(project_key2), None))
.await
Expand All @@ -666,7 +700,7 @@ mod tests {

#[tokio::test]
async fn test_last_peek_internal_order() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new();
let mut buffer = EnvelopeBuffer::<MemoryStacksManager>::new(mock_memory_checker());

let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let event_id_1 = EventId::new();
Expand Down
6 changes: 0 additions & 6 deletions relay-server/src/services/buffer/envelope_stack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,3 @@ pub trait EnvelopeStack: Send + std::fmt::Debug {
/// Pops the [`Envelope`] on top of the stack.
fn pop(&mut self) -> impl Future<Output = Result<Option<Box<Envelope>>, Self::Error>>;
}

pub trait StackProvider: std::fmt::Debug {
type Stack: EnvelopeStack;

fn create_stack(&self, envelope: Box<Envelope>) -> Self::Stack;
}
Loading
Loading