Skip to content

Commit

Permalink
chore(spool): Enable throttled periodic unspool (#2993)
Browse files Browse the repository at this point in the history
Currently in order to trigger unspool of buffered envelopes the new
project state update must be received from the upstream, which sometimes
can take longer time.

To make unspool action a bit more predictable let's enable periodic
unspool initiated by project cache. This way:
* spool service does not have to know when to unspool
* spool service just has to serve the requests and communicate back the
results
* project cache will take care of the initiating the action and only
request unspool of the envelopes when certain criteria are met
  • Loading branch information
olksdr authored Jan 25, 2024
1 parent a29d5b2 commit c892dd9
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 52 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- Copy event measurements to span & normalize span measurements. ([#2953](https://github.com/getsentry/relay/pull/2953))
- Add `allow_negative` to `BuiltinMeasurementKey`. Filter out negative BuiltinMeasurements if `allow_negative` is false. ([#2982](https://github.com/getsentry/relay/pull/2982))
- Add possiblity to block metrics or their tags with glob-patterns. ([#2954](https://github.com/getsentry/relay/pull/2954), [#2973](https://github.com/getsentry/relay/pull/2973))
- Enable throttled periodic unspool of the buffered envelopes. ([#2993](https://github.com/getsentry/relay/pull/2993))

**Bug Fixes**:

Expand Down
14 changes: 14 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,11 @@ fn spool_envelopes_max_connections() -> u32 {
20
}

/// Default interval to unspool buffered envelopes, 100ms.
fn spool_envelopes_unspool_interval() -> u64 {
100
}

/// Persistent buffering configuration for incoming envelopes.
#[derive(Debug, Serialize, Deserialize)]
pub struct EnvelopeSpool {
Expand All @@ -849,6 +854,9 @@ pub struct EnvelopeSpool {
/// This is a hard upper bound and defaults to 524288000 bytes (500MB).
#[serde(default = "spool_envelopes_max_memory_size")]
max_memory_size: ByteSize,
/// The interval in milliseconds to trigger unspool.
#[serde(default = "spool_envelopes_unspool_interval")]
unspool_interval: u64,
}

impl Default for EnvelopeSpool {
Expand All @@ -859,6 +867,7 @@ impl Default for EnvelopeSpool {
min_connections: spool_envelopes_min_connections(),
max_disk_size: spool_envelopes_max_disk_size(),
max_memory_size: spool_envelopes_max_memory_size(),
unspool_interval: spool_envelopes_unspool_interval(), // 100ms
}
}
}
Expand Down Expand Up @@ -1936,6 +1945,11 @@ impl Config {
self.values.spool.envelopes.min_connections
}

/// Unspool interval in milliseconds.
pub fn spool_envelopes_unspool_interval(&self) -> Duration {
Duration::from_millis(self.values.spool.envelopes.unspool_interval)
}

/// The maximum size of the buffer, in bytes.
pub fn spool_envelopes_max_disk_size(&self) -> usize {
self.values.spool.envelopes.max_disk_size.as_bytes()
Expand Down
220 changes: 172 additions & 48 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::{BTreeMap, BTreeSet};
use std::error::Error;
use std::sync::Arc;
use std::time::Duration;

use relay_base_schema::project::ProjectKey;
use relay_config::{Config, RelayMode};
Expand Down Expand Up @@ -28,7 +29,9 @@ use crate::services::test_store::TestStore;
use crate::services::upstream::UpstreamRelay;

use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers};
use crate::utils::{self, BufferGuard, GarbageDisposal, ManagedEnvelope};
use crate::utils::{
self, BufferGuard, GarbageDisposal, ManagedEnvelope, RetryBackoff, SleepHandle,
};

/// Requests a refresh of a project state from one of the available sources.
///
Expand Down Expand Up @@ -490,6 +493,8 @@ struct ProjectCacheBroker {
buffer_guard: Arc<BufferGuard>,
/// Index of the buffered project keys.
index: BTreeMap<ProjectKey, BTreeSet<QueueKey>>,
buffer_unspool_handle: SleepHandle,
buffer_unspool_backoff: RetryBackoff,
buffer: Addr<Buffer>,
global_config: GlobalConfigStatus,
}
Expand All @@ -502,14 +507,8 @@ struct ProjectCacheBroker {
enum GlobalConfigStatus {
/// Global config needed for envelope processing.
Ready(Arc<GlobalConfig>),
/// Project keys waiting for global config to dequeue from buffer.
///
/// These project keys were used to try to dequeue the buffer, but were blocked because we
/// lacked a global config. When global config arrives, we will request project states for these keys again
/// which will trigger another dequeue that should be succesful. The reason we don't dequeue
/// directly when we receive [`GlobalConfig`] is that the [`ProjectState`]s might have expired in the meantime,
/// which would drop the envelopes when they are sent to ProjectCacheBroker::handle_processing.
Pending(BTreeSet<ProjectKey>),
/// The global config is not fetched yet.
Pending,
}

impl GlobalConfigStatus {
Expand All @@ -520,36 +519,7 @@ impl GlobalConfigStatus {

impl ProjectCacheBroker {
fn set_global_config(&mut self, global_config: Arc<GlobalConfig>) {
let GlobalConfigStatus::Pending(project_keys) = std::mem::replace(
&mut self.global_config,
GlobalConfigStatus::Ready(global_config),
) else {
return;
};

relay_log::info!(
"global config received: {} projects were pending",
project_keys.len()
);

// Check which of the pending projects can be dequeued now:
for project_key in project_keys {
let project_cache = self.services.project_cache.clone();

// Check for a cached state:
// 1. If the state is cached and valid, trigger an immediate dequeue. There could be a
// fetch running in the background, but this is not guaranteed.
// 2. If the state is not cached, `get_cached_state` will trigger a fetch in the
// background. Once the fetch completes, the project will automatically cause a
// dequeue by sending `UpdateProjectState` to the broker.
let state = self
.get_or_create_project(project_key)
.get_cached_state(project_cache, false);

if state.map_or(false, |s| !s.invalid()) {
self.dequeue(project_key);
}
}
self.global_config = GlobalConfigStatus::Ready(global_config);
}

/// Adds the value to the queue for the provided key.
Expand All @@ -565,8 +535,7 @@ impl ProjectCacheBroker {
/// forwarded to `handle_processing`.
pub fn dequeue(&mut self, partial_key: ProjectKey) {
// If we don't yet have the global config, we will defer dequeuing until we do.
if let GlobalConfigStatus::Pending(keys) = &mut self.global_config {
keys.insert(partial_key);
if let GlobalConfigStatus::Pending = self.global_config {
return;
}

Expand Down Expand Up @@ -683,8 +652,10 @@ impl ProjectCacheBroker {
no_cache,
);

if !state.invalid() {
self.dequeue(project_key);
// Schedule unspool if nothing is running at the moment.
if !self.buffer_unspool_backoff.started() {
self.buffer_unspool_backoff.reset();
self.schedule_unspool();
}
}

Expand Down Expand Up @@ -915,6 +886,64 @@ impl ProjectCacheBroker {
}
}

/// Returns backoff timeout for an unspool attempt.
fn next_unspool_attempt(&mut self) -> Duration {
self.config.spool_envelopes_unspool_interval() + self.buffer_unspool_backoff.next_backoff()
}

fn schedule_unspool(&mut self) {
if self.buffer_unspool_handle.is_idle() {
// Set the time for the next attempt.
let wait = self.next_unspool_attempt();
self.buffer_unspool_handle.set(wait);
}
}

/// Iterates the buffer index and tries to unspool the envelopes for projects with a valid
/// state.
///
/// This makes sure we always moving the unspool forward, even if we do not fetch the project
/// states updates, but still can process data based on the existing cache.
fn handle_periodic_unspool(&mut self) {
self.buffer_unspool_handle.reset();

// If there is nothing spooled, schedule the next check a little bit later.
if self.index.is_empty() {
self.schedule_unspool();
return;
}

let keys = self.index.keys().cloned().collect::<Box<[_]>>();
let mut dequeued = false;

for project_key in keys.iter() {
if self.projects.get_mut(project_key).map_or(false, |project| {
// Returns `Some` if the project is cache otherwise None and also triggers refresh
// in background.
project
.get_cached_state(self.services.project_cache.clone(), false)
// Makes sure that the state also is valid.
.map_or(false, |state| !state.invalid())
}) {
// Do *not* attempt to unspool if the assigned permits over low watermark.
if !self.buffer_guard.is_below_low_watermark() {
self.schedule_unspool();
return;
}

self.dequeue(*project_key);
dequeued = true;
}
}

// If cannot dequeue for some reason, back off the next retry.
if dequeued {
self.buffer_unspool_backoff.reset();
}
// Schedule unspool once we are done.
self.schedule_unspool();
}

fn handle_message(&mut self, message: ProjectCache) {
match message {
ProjectCache::RequestUpdate(message) => self.handle_request_update(message),
Expand Down Expand Up @@ -1022,7 +1051,7 @@ impl Service for ProjectCacheService {
}
global_config::Status::Pending => {
relay_log::info!("waiting for global config");
GlobalConfigStatus::Pending(BTreeSet::new())
GlobalConfigStatus::Pending
}
};

Expand All @@ -1035,12 +1064,18 @@ impl Service for ProjectCacheService {
config: config.clone(),
projects: hashbrown::HashMap::new(),
garbage_disposal: GarbageDisposal::new(),
source: ProjectSource::start(config, services.upstream_relay.clone(), redis),
source: ProjectSource::start(
config.clone(),
services.upstream_relay.clone(),
redis,
),
services,
state_tx,
buffer_tx,
buffer_guard,
index: BTreeMap::new(),
buffer_unspool_handle: SleepHandle::idle(),
buffer_unspool_backoff: RetryBackoff::new(config.http_max_retry_interval()),
buffer,
global_config,
};
Expand All @@ -1062,6 +1097,7 @@ impl Service for ProjectCacheService {
// permits in `BufferGuard` available. Currently this is 50%.
Some(managed_envelope) = buffer_rx.recv() => broker.handle_processing(managed_envelope),
_ = ticker.tick() => broker.evict_stale_project_caches(),
() = &mut broker.buffer_unspool_handle => broker.handle_periodic_unspool(),
Some(message) = rx.recv() => broker.handle_message(message),
else => break,
}
Expand Down Expand Up @@ -1097,10 +1133,11 @@ mod tests {
use std::time::Duration;

use relay_test::mock_service;
use tokio::select;
use uuid::Uuid;

use crate::services::processor::ProcessingGroup;
use crate::testutils::empty_envelope;
use crate::testutils::{empty_envelope, empty_envelope_with_dsn};

use super::*;

Expand Down Expand Up @@ -1173,7 +1210,9 @@ mod tests {
buffer_guard,
index: BTreeMap::new(),
buffer: buffer.clone(),
global_config: GlobalConfigStatus::Pending(BTreeSet::new()),
global_config: GlobalConfigStatus::Pending,
buffer_unspool_handle: SleepHandle::idle(),
buffer_unspool_backoff: RetryBackoff::new(Duration::from_millis(100)),
},
buffer,
)
Expand Down Expand Up @@ -1242,7 +1281,7 @@ mod tests {
envelopes.pop().unwrap();
assert_eq!(buffer_guard.available(), 1);

// Till now we should have enqueued 5 envleopes and dequeued only 1, it means the index is
// Till now we should have enqueued 5 envelopes and dequeued only 1, it means the index is
// still populated with same keys and values.
assert_eq!(broker.index.keys().len(), 1);
assert_eq!(broker.index.values().count(), 1);
Expand All @@ -1268,4 +1307,89 @@ mod tests {
assert!(buffer_rx.try_recv().is_err())
}
}

#[tokio::test]
async fn periodic_unspool() {
relay_log::init_test!();

let num_permits = 50;
let buffer_guard: Arc<_> = BufferGuard::new(num_permits).into();
let services = mocked_services();
let (state_tx, _) = mpsc::unbounded_channel();
let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel();
let (mut broker, _buffer_svc) =
project_cache_broker_setup(services.clone(), buffer_guard.clone(), state_tx, buffer_tx)
.await;

broker.global_config = GlobalConfigStatus::Ready(Default::default());
let (tx_update, mut rx_update) = mpsc::unbounded_channel();
let (tx_assert, mut rx_assert) = mpsc::unbounded_channel();

let dsn1 = "111d836b15bb49d7bbf99e64295d995b";
let dsn2 = "eeed836b15bb49d7bbf99e64295d995b";

// Send and spool some envelopes.
for dsn in [dsn1, dsn2] {
let envelope = buffer_guard
.enter(
empty_envelope_with_dsn(dsn),
services.outcome_aggregator.clone(),
services.test_store.clone(),
ProcessingGroup::Ungrouped,
)
.unwrap();

let message = ValidateEnvelope { envelope };

broker.handle_validate_envelope(message);
tokio::time::sleep(Duration::from_millis(200)).await;
// Nothing will be dequeued.
assert!(buffer_rx.try_recv().is_err())
}

// Emulate the project cache service loop.
tokio::task::spawn(async move {
loop {
select! {

Some(assert) = rx_assert.recv() => {
assert_eq!(broker.index.keys().len(), assert);
assert_eq!(broker.index.values().count(), assert);
},
Some(update) = rx_update.recv() => broker.merge_state(update),
() = &mut broker.buffer_unspool_handle => broker.handle_periodic_unspool(),
}
}
});

// Before updating any project states.
tx_assert.send(2).unwrap();

let update_dsn1_project_state = UpdateProjectState {
project_key: ProjectKey::parse(dsn1).unwrap(),
state: ProjectState::allowed().into(),
no_cache: false,
};

tx_update.send(update_dsn1_project_state).unwrap();
assert!(buffer_rx.recv().await.is_some());
// One of the project should be unspooled.
tx_assert.send(1).unwrap();

// Schedule some work...
tokio::time::sleep(Duration::from_secs(2)).await;

let update_dsn2_project_state = UpdateProjectState {
project_key: ProjectKey::parse(dsn2).unwrap(),
state: ProjectState::allowed().into(),
no_cache: false,
};

tx_update.send(update_dsn2_project_state).unwrap();
assert!(buffer_rx.recv().await.is_some());
// The last project should be unspooled.
tx_assert.send(0).unwrap();
// Make sure the last assert is tested.
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
8 changes: 5 additions & 3 deletions relay-server/src/testutils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ pub fn new_envelope<T: Into<String>>(with_dsc: bool, transaction_name: T) -> Box
}

pub fn empty_envelope() -> Box<Envelope> {
let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
.parse()
.unwrap();
empty_envelope_with_dsn("e12d836b15bb49d7bbf99e64295d995b")
}

pub fn empty_envelope_with_dsn(dsn: &str) -> Box<Envelope> {
let dsn = format!("https://{dsn}:@sentry.io/42").parse().unwrap();

let mut envelope = Envelope::from_request(Some(EventId::new()), RequestMeta::new(dsn));
envelope.add_item(Item::new(ItemType::Event));
Expand Down
Loading

0 comments on commit c892dd9

Please sign in to comment.