Skip to content

Commit

Permalink
feat(server): Return global config status for downstream requests (#2765
Browse files Browse the repository at this point in the history
)

follow-up to #2697


we now have a mechanism to delay envelope processing until a global
config is available, however, we currently send back a default global
config if one is requested from downstream, which kind of defeats the
purpose if the downstream relay believes it has a valid global config
but it doesn't.
 
To fix this, this pr will return an additional flag whether the config
it sends is ready or not, up to date downstream relays will not use them
but instead keep trying to get a ready global config and not process
envelopes until they do. Older relays won't deserialize the status flag
and their behaviour will be as before.
  • Loading branch information
TBS1996 authored Nov 29, 2023
1 parent 8d52fcb commit 04bc4c9
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
**Features**:

- Partition and split metric buckets just before sending. Log outcomes for metrics. ([#2682](https://github.com/getsentry/relay/pull/2682))
- Return global config ready status to downstream relays. ([#2765](https://github.com/getsentry/relay/pull/2765))

**Internal**:

Expand Down
37 changes: 26 additions & 11 deletions relay-server/src/actors/global_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,24 @@ type UpstreamQueryResult =
struct GetGlobalConfigResponse {
#[serde(default)]
global: Option<GlobalConfig>,
// Instead of using [`Status`], we use StatusResponse as a separate field in order to not
// make breaking changes to the api.
#[serde(default)]
global_status: Option<StatusResponse>,
}

/// A mirror of [`Status`] without the associated data for use in serialization.
#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum StatusResponse {
Ready,
Pending,
}

impl StatusResponse {
pub fn is_ready(self) -> bool {
matches!(self, Self::Ready)
}
}

/// The request to fetch a global config from upstream.
Expand Down Expand Up @@ -148,14 +166,6 @@ impl Status {
fn is_ready(&self) -> bool {
matches!(self, Self::Ready(_))
}

/// Similar to Option::unwrap_or_default.
pub fn get_ready_or_default(self) -> Arc<GlobalConfig> {
match self {
Status::Ready(global_config) => global_config,
Status::Pending => Arc::default(),
}
}
}

/// Service implementing the [`GlobalConfigManager`] interface.
Expand Down Expand Up @@ -254,10 +264,14 @@ impl GlobalConfigService {
/// global config is valid and contains the expected data.
fn handle_result(&mut self, result: UpstreamQueryResult) {
match result {
Ok(Ok(config)) => {
Ok(Ok(response)) => {
let mut success = false;
match config.global {
Some(global_config) => {
// Older relays won't send a global status, in that case, we will pretend like the
// default global config is an up to date one, because that was the old behaviour.
let is_ready = response.global_status.map_or(true, |stat| stat.is_ready());

match response.global {
Some(global_config) if is_ready => {
// Log the first time we receive a global config from upstream.
if !self.global_config_watch.borrow().is_ready() {
relay_log::info!("received global config from upstream");
Expand All @@ -267,6 +281,7 @@ impl GlobalConfigService {
success = true;
self.last_fetched = Instant::now();
}
Some(_) => relay_log::info!("global config from upstream is not yet ready"),
None => relay_log::error!("global config missing in upstream response"),
}
metric!(
Expand Down
32 changes: 19 additions & 13 deletions relay-server/src/endpoints/project_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use relay_base_schema::project::ProjectKey;
use relay_dynamic_config::{ErrorBoundary, GlobalConfig};
use serde::{Deserialize, Serialize};

use crate::actors::global_config;
use crate::actors::global_config::{self, StatusResponse};
use crate::actors::project::{LimitedProjectState, ProjectState};
use crate::actors::project_cache::{GetCachedProjectState, GetProjectState};
use crate::endpoints::common::ServiceUnavailable;
Expand Down Expand Up @@ -81,6 +81,8 @@ struct GetProjectStatesResponseWrapper {
pending: Vec<ProjectKey>,
#[serde(skip_serializing_if = "Option::is_none")]
global: Option<Arc<GlobalConfig>>,
#[serde(skip_serializing_if = "Option::is_none")]
global_status: Option<StatusResponse>,
}

/// Request payload of the project config endpoint.
Expand Down Expand Up @@ -127,19 +129,22 @@ async fn inner(
(project_key, state_result)
});

let mut configs = HashMap::with_capacity(keys_len);
let mut pending = Vec::with_capacity(keys_len);
let global_config = match inner.global {
true => Some(
state
.global_config()
.send(global_config::Get)
.await?
.get_ready_or_default(),
),
false => None,
let (global, global_status) = if inner.global {
match state.global_config().send(global_config::Get).await? {
global_config::Status::Ready(config) => (Some(config), Some(StatusResponse::Ready)),
// Old relays expect to get a global config no matter what, even if it's not ready
// yet. We therefore give them a default global config.
global_config::Status::Pending => (
Some(GlobalConfig::default().into()),
Some(StatusResponse::Pending),
),
}
} else {
(None, None)
};

let mut pending = Vec::with_capacity(keys_len);
let mut configs = HashMap::with_capacity(keys_len);
for (project_key, state_result) in future::join_all(futures).await {
let Some(project_state) = state_result? else {
pending.push(project_key);
Expand Down Expand Up @@ -170,7 +175,8 @@ async fn inner(
Ok(Json(GetProjectStatesResponseWrapper {
configs,
pending,
global: global_config,
global,
global_status,
}))
}

Expand Down

0 comments on commit 04bc4c9

Please sign in to comment.