Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Delay serving global config in managed mode before upstream response #2697

Merged
merged 43 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
370eb3b
init
TBS1996 Nov 8, 2023
8634049
wip
TBS1996 Nov 8, 2023
b250419
wip
TBS1996 Nov 8, 2023
8bc4c6b
wip
TBS1996 Nov 8, 2023
f155001
wip
TBS1996 Nov 8, 2023
0266eb0
wip
TBS1996 Nov 9, 2023
d555e7b
wip
TBS1996 Nov 9, 2023
c6206bc
wip
TBS1996 Nov 9, 2023
0cdffda
wip
TBS1996 Nov 9, 2023
dc13b9c
wip
TBS1996 Nov 9, 2023
9600a42
wip
TBS1996 Nov 9, 2023
397c477
wip
TBS1996 Nov 9, 2023
0e9a04e
wip
TBS1996 Nov 9, 2023
6b68257
wip
TBS1996 Nov 9, 2023
1a68659
wip
TBS1996 Nov 9, 2023
dc3efad
Merge branch 'master' into tor/gcbuffer
TBS1996 Nov 9, 2023
9e1d168
wip
TBS1996 Nov 9, 2023
249b77b
wip
TBS1996 Nov 10, 2023
0c944c3
wip
TBS1996 Nov 10, 2023
03758eb
Merge branch 'master' into tor/gcbuffer
TBS1996 Nov 10, 2023
522ea67
wip
TBS1996 Nov 10, 2023
3aebe13
fix changelog
TBS1996 Nov 10, 2023
c35d4ce
address feedback
TBS1996 Nov 10, 2023
a8b8ef0
Merge branch 'master' into tor/gcbuffer
TBS1996 Nov 13, 2023
cd3b1ef
replace send with send_replace
TBS1996 Nov 13, 2023
33dd47a
fix lint
TBS1996 Nov 13, 2023
5955890
formatting
TBS1996 Nov 13, 2023
faf7794
questionable commit
TBS1996 Nov 13, 2023
6b75701
add test
TBS1996 Nov 13, 2023
3d3ec26
speed up test
TBS1996 Nov 13, 2023
2b5dde1
nit
TBS1996 Nov 13, 2023
fa9e47e
name changes
TBS1996 Nov 13, 2023
fbd2fc8
nit
TBS1996 Nov 13, 2023
19a366a
Waiting->Pending
TBS1996 Nov 14, 2023
a6cd5f1
nit
TBS1996 Nov 15, 2023
06e262c
Merge branch 'master' into tor/gcbuffer
TBS1996 Nov 15, 2023
86a396f
address feedback
TBS1996 Nov 17, 2023
1e8d941
Merge branch 'master' into tor/gcbuffer
TBS1996 Nov 17, 2023
a6dd1c7
fix changelog entry
TBS1996 Nov 17, 2023
96fb0e1
sleep before assert empty
TBS1996 Nov 17, 2023
f5adc52
use timeout in assert_empty
TBS1996 Nov 19, 2023
887bc98
Merge branch 'master' into tor/gcbuffer
TBS1996 Nov 21, 2023
4bb1cbf
merge
TBS1996 Nov 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions relay-server/src/actors/global_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ pub enum Status {
///
/// This variant should never be sent after the first `Ready` has occured.
#[default]
Waiting,
Pending,
}

impl Status {
Expand All @@ -153,7 +153,7 @@ impl Status {
pub fn get_ready_or_default(self) -> Arc<GlobalConfig> {
match self {
Status::Ready(global_config) => global_config,
Status::Waiting => Arc::default(),
Status::Pending => Arc::default(),
}
}
}
Expand Down Expand Up @@ -188,7 +188,7 @@ impl GlobalConfigService {
/// Creates a new [`GlobalConfigService`].
pub fn new(config: Arc<Config>, upstream: Addr<UpstreamRelay>) -> Self {
let (internal_tx, internal_rx) = mpsc::channel(1);
let (global_config_watch, _) = watch::channel(Status::Waiting);
let (global_config_watch, _) = watch::channel(Status::Pending);

Self {
config,
Expand Down Expand Up @@ -262,8 +262,6 @@ impl GlobalConfigService {
if !self.global_config_watch.borrow().is_ready() {
relay_log::info!("received global config from upstream");
}
// Notifying subscribers only fails when there are no
// subscribers.
TBS1996 marked this conversation as resolved.
Show resolved Hide resolved
self.global_config_watch
.send_replace(Status::Ready(Arc::new(global_config)));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we replace send with send_replace because in case theres not yet a receiver send_replace will populate the receiver when it gets constructed later.

success = true;
Expand Down
47 changes: 25 additions & 22 deletions relay-server/src/actors/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,12 @@ enum GlobalConfigStatus {
Pending(BTreeSet<ProjectKey>),
}

impl GlobalConfigStatus {
fn is_ready(&self) -> bool {
matches!(self, GlobalConfigStatus::Ready(_))
}
}

impl ProjectCacheBroker {
fn set_global_config(&mut self, global_config: Arc<GlobalConfig>) {
if let GlobalConfigStatus::Pending(project_keys) = std::mem::replace(
Expand All @@ -502,17 +508,13 @@ impl ProjectCacheBroker {
}
}

fn get_global_config(&self) -> Option<Arc<GlobalConfig>> {
fn global_config_value(&self) -> Option<Arc<GlobalConfig>> {
TBS1996 marked this conversation as resolved.
Show resolved Hide resolved
match &self.global_config {
GlobalConfigStatus::Ready(gc) => Some(gc.clone()),
GlobalConfigStatus::Pending(_) => None,
}
}

fn global_config_ready(&self) -> bool {
matches!(self.global_config, GlobalConfigStatus::Ready(_))
}

/// Adds the value to the queue for the provided key.
pub fn enqueue(&mut self, key: QueueKey, value: ManagedEnvelope) {
self.index.entry(key.own_key).or_default().insert(key);
Expand Down Expand Up @@ -733,7 +735,7 @@ impl ProjectCacheBroker {
fn handle_processing(&mut self, managed_envelope: ManagedEnvelope) {
let project_key = managed_envelope.envelope().meta().public_key();

let Some(global_config) = self.get_global_config() else {
let Some(global_config) = self.global_config_value() else {
// This indicates a logical error in our approach, this function should only
// be called when we have a global config.
relay_log::error!("attempted to process envelope without global config");
Expand Down Expand Up @@ -820,7 +822,7 @@ impl ProjectCacheBroker {
if project_state.is_some()
&& (sampling_state.is_some() || sampling_key.is_none())
&& !self.buffer_guard.is_over_high_watermark()
&& self.global_config_ready()
&& self.global_config.is_ready()
{
return self.handle_processing(context);
}
Expand Down Expand Up @@ -965,7 +967,7 @@ impl Service for ProjectCacheService {
relay_log::info!("global config received");
GlobalConfigStatus::Ready(global_config)
}
global_config::Status::Waiting => {
global_config::Status::Pending => {
relay_log::info!("waiting for global config");
GlobalConfigStatus::Pending(BTreeSet::new())
}
Expand All @@ -989,22 +991,23 @@ impl Service for ProjectCacheService {

loop {
tokio::select! {
biased;
Ok(()) = subscription.changed() => {
match subscription.borrow().clone() {
global_config::Status::Ready(global_config) => broker.set_global_config(global_config),
// The watch should only be updated if it gets a new value.
// This would imply a logical bug.
global_config::Status::Waiting => relay_log::error!("still waiting for the global config"),
biased;

Ok(()) = subscription.changed() => {
match subscription.borrow().clone() {
global_config::Status::Ready(global_config) => broker.set_global_config(global_config),
// The watch should only be updated if it gets a new value.
// This would imply a logical bug.
global_config::Status::Pending => relay_log::error!("still waiting for the global config"),
}
},
Some(message) = state_rx.recv() => broker.merge_state(message),
// Buffer will not dequeue the envelopes from the spool if there is not enough
// permits in `BufferGuard` available. Currently this is 50%.
Some(managed_envelope) = buffer_rx.recv() => broker.handle_processing(managed_envelope),
_ = ticker.tick() => broker.evict_stale_project_caches(),
Some(message) = rx.recv() => broker.handle_message(message),
else => break,
Some(message) = state_rx.recv() => broker.merge_state(message),
// Buffer will not dequeue the envelopes from the spool if there is not enough
// permits in `BufferGuard` available. Currently this is 50%.
Some(managed_envelope) = buffer_rx.recv() => broker.handle_processing(managed_envelope),
_ = ticker.tick() => broker.evict_stale_project_caches(),
Some(message) = rx.recv() => broker.handle_message(message),
else => break,
}
}

Expand Down
48 changes: 48 additions & 0 deletions tests/integration/test_envelope.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,3 +579,51 @@ def test_sample_rates_metrics(mini_sentry, relay_with_processing, events_consume

event, _ = events_consumer.get_event()
assert event["_metrics"]["sample_rates"] == sample_rates


# Checks that we buffer envelopes until we have a global config in processing mode.
TBS1996 marked this conversation as resolved.
Show resolved Hide resolved
def test_buffer_envelopes_without_global_config(
mini_sentry, relay_with_processing, events_consumer
):
include_global = False
original_endpoint = mini_sentry.app.view_functions["get_project_config"]

@mini_sentry.app.endpoint("get_project_config")
def get_project_config():
nonlocal include_global

res = original_endpoint().get_json()
if not include_global:
res.pop("global", None)
TBS1996 marked this conversation as resolved.
Show resolved Hide resolved
return res

mini_sentry.add_basic_project_config(42)
events_consumer = events_consumer()
options = {"cache": {"global_config_fetch_interval": 1}}
TBS1996 marked this conversation as resolved.
Show resolved Hide resolved
relay = relay_with_processing(options=options)

envelope = Envelope()
envelope.add_event({"message": "hello, world!"})
TBS1996 marked this conversation as resolved.
Show resolved Hide resolved
relay.send_envelope(42, envelope)

error_raised = False
try:
# Event is still in buffer because we have not provided a global config
events_consumer.get_event(timeout=1)
except AssertionError:
error_raised = True
assert error_raised
TBS1996 marked this conversation as resolved.
Show resolved Hide resolved

include_global = True
# Clear errors because we log error when we request global config yet we dont receive it.
assert len(mini_sentry.test_failures) > 0
for err in mini_sentry.test_failures:
assert (
str(err[1])
== "Relay sent us event: global config missing in upstream response"
)
mini_sentry.test_failures.clear()

# Global configs are fetched in 10 second intervals, so the event should have come
# through after a 10 sec timeout.
events_consumer.get_event(timeout=10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, assert that after the global config is fetched we get all sent envelopes here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean to do it in a more explicit manner? because afaict, calling get_event does assert that we got the envelope, since it will panic if not present

4 changes: 2 additions & 2 deletions tests/integration/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ def get_project_config():
try:
relay.send_event(42)

event = mini_sentry.captured_events.get(timeout=8).get_event()
event = mini_sentry.captured_events.get(timeout=12).get_event()
assert event["logentry"] == {"formatted": "Hello, World!"}
assert retry_count == 2
assert retry_count == 4
Comment on lines +143 to +145
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can look deeper into it but I believe it simply needs more retries because after the first 2 retries it have to wait for a bit before it gets the global config and can proceed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do project configs need more retries with this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


if mini_sentry.test_failures:
for _, error in mini_sentry.test_failures:
Expand Down
Loading