Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Delay serving global config in managed mode before upstream response #2697

Merged
merged 43 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
370eb3b
init
TBS1996 Nov 8, 2023
8634049
wip
TBS1996 Nov 8, 2023
b250419
wip
TBS1996 Nov 8, 2023
8bc4c6b
wip
TBS1996 Nov 8, 2023
f155001
wip
TBS1996 Nov 8, 2023
0266eb0
wip
TBS1996 Nov 9, 2023
d555e7b
wip
TBS1996 Nov 9, 2023
c6206bc
wip
TBS1996 Nov 9, 2023
0cdffda
wip
TBS1996 Nov 9, 2023
dc13b9c
wip
TBS1996 Nov 9, 2023
9600a42
wip
TBS1996 Nov 9, 2023
397c477
wip
TBS1996 Nov 9, 2023
0e9a04e
wip
TBS1996 Nov 9, 2023
6b68257
wip
TBS1996 Nov 9, 2023
1a68659
wip
TBS1996 Nov 9, 2023
dc3efad
Merge branch 'master' into tor/gcbuffer
TBS1996 Nov 9, 2023
9e1d168
wip
TBS1996 Nov 9, 2023
249b77b
wip
TBS1996 Nov 10, 2023
0c944c3
wip
TBS1996 Nov 10, 2023
03758eb
Merge branch 'master' into tor/gcbuffer
TBS1996 Nov 10, 2023
522ea67
wip
TBS1996 Nov 10, 2023
3aebe13
fix changelog
TBS1996 Nov 10, 2023
c35d4ce
address feedback
TBS1996 Nov 10, 2023
a8b8ef0
Merge branch 'master' into tor/gcbuffer
TBS1996 Nov 13, 2023
cd3b1ef
replace send with send_replace
TBS1996 Nov 13, 2023
33dd47a
fix lint
TBS1996 Nov 13, 2023
5955890
formatting
TBS1996 Nov 13, 2023
faf7794
questionable commit
TBS1996 Nov 13, 2023
6b75701
add test
TBS1996 Nov 13, 2023
3d3ec26
speed up test
TBS1996 Nov 13, 2023
2b5dde1
nit
TBS1996 Nov 13, 2023
fa9e47e
name changes
TBS1996 Nov 13, 2023
fbd2fc8
nit
TBS1996 Nov 13, 2023
19a366a
Waiting->Pending
TBS1996 Nov 14, 2023
a6cd5f1
nit
TBS1996 Nov 15, 2023
06e262c
Merge branch 'master' into tor/gcbuffer
TBS1996 Nov 15, 2023
86a396f
address feedback
TBS1996 Nov 17, 2023
1e8d941
Merge branch 'master' into tor/gcbuffer
TBS1996 Nov 17, 2023
a6dd1c7
fix changelog entry
TBS1996 Nov 17, 2023
96fb0e1
sleep before assert empty
TBS1996 Nov 17, 2023
f5adc52
use timeout in assert_empty
TBS1996 Nov 19, 2023
887bc98
Merge branch 'master' into tor/gcbuffer
TBS1996 Nov 21, 2023
4bb1cbf
merge
TBS1996 Nov 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
- License is now FSL instead of BSL ([#2739](https://github.com/getsentry/relay/pull/2739))
- Support `device.model` in dynamic sampling and metric extraction. ([#2728](https://github.com/getsentry/relay/pull/2728))
- Support comparison operators (`>`, `>=`, `<`, `<=`) for strings in dynamic sampling and metric extraction rules. Previously, these comparisons were only possible on numbers. ([#2730](https://github.com/getsentry/relay/pull/2730))
- Postpone processing till the global config is available. ([#2697](https://github.com/getsentry/relay/pull/2697))
- Skip running `NormalizeProcessor` on renormalization. ([#2744](https://github.com/getsentry/relay/pull/2744))


## 23.11.0

**Features**:
Expand Down
69 changes: 52 additions & 17 deletions relay-server/src/actors/global_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,32 +105,59 @@ pub struct Subscribe;
/// frequency from upstream.
pub enum GlobalConfigManager {
/// Returns the most recent global config.
Get(relay_system::Sender<Arc<GlobalConfig>>),
Get(relay_system::Sender<Status>),
/// Returns a [`watch::Receiver`] where global config updates will be sent to.
Subscribe(relay_system::Sender<watch::Receiver<Arc<GlobalConfig>>>),
Subscribe(relay_system::Sender<watch::Receiver<Status>>),
}

impl Interface for GlobalConfigManager {}

impl FromMessage<Get> for GlobalConfigManager {
type Response = AsyncResponse<Arc<GlobalConfig>>;
type Response = AsyncResponse<Status>;

fn from_message(_: Get, sender: relay_system::Sender<Arc<GlobalConfig>>) -> Self {
fn from_message(_: Get, sender: relay_system::Sender<Status>) -> Self {
Self::Get(sender)
}
}

impl FromMessage<Subscribe> for GlobalConfigManager {
type Response = AsyncResponse<watch::Receiver<Arc<GlobalConfig>>>;
type Response = AsyncResponse<watch::Receiver<Status>>;

fn from_message(
_: Subscribe,
sender: relay_system::Sender<watch::Receiver<Arc<GlobalConfig>>>,
) -> Self {
fn from_message(_: Subscribe, sender: relay_system::Sender<watch::Receiver<Status>>) -> Self {
Self::Subscribe(sender)
}
}

/// Describes the current fetching status of the [`GlobalConfig`] from the upstream.
#[derive(Debug, Clone, Default)]
pub enum Status {
iker-barriocanal marked this conversation as resolved.
Show resolved Hide resolved
/// Global config ready to be used by other services.
///
/// This variant implies different things in different circumstances. In managed mode, it means
/// that we have received a config from upstream. In other modes the config is either
/// from a file or the default global config.
Ready(Arc<GlobalConfig>),
/// The global config is requested from the upstream but it has not arrived yet.
///
/// This variant should never be sent after the first `Ready` has occured.
#[default]
Pending,
}

impl Status {
fn is_ready(&self) -> bool {
matches!(self, Self::Ready(_))
}

/// Similar to Option::unwrap_or_default.
pub fn get_ready_or_default(self) -> Arc<GlobalConfig> {
match self {
Status::Ready(global_config) => global_config,
Status::Pending => Arc::default(),
}
}
}

/// Service implementing the [`GlobalConfigManager`] interface.
///
/// The service offers two alternatives to fetch the [`GlobalConfig`]:
Expand All @@ -140,7 +167,7 @@ impl FromMessage<Subscribe> for GlobalConfigManager {
pub struct GlobalConfigService {
config: Arc<Config>,
/// Sender of the [`watch`] channel for the subscribers of the service.
global_config_watch: watch::Sender<Arc<GlobalConfig>>,
global_config_watch: watch::Sender<Status>,
/// Sender of the internal channel to forward global configs from upstream.
internal_tx: mpsc::Sender<UpstreamQueryResult>,
/// Receiver of the internal channel to forward global configs from upstream.
Expand All @@ -161,7 +188,7 @@ impl GlobalConfigService {
/// Creates a new [`GlobalConfigService`].
pub fn new(config: Arc<Config>, upstream: Addr<UpstreamRelay>) -> Self {
let (internal_tx, internal_rx) = mpsc::channel(1);
let (global_config_watch, _) = watch::channel(Arc::default());
let (global_config_watch, _) = watch::channel(Status::Pending);

Self {
config,
Expand All @@ -177,7 +204,7 @@ impl GlobalConfigService {
}

/// Handles messages from external services.
fn handle_message(&self, message: GlobalConfigManager) {
fn handle_message(&mut self, message: GlobalConfigManager) {
match message {
GlobalConfigManager::Get(sender) => {
sender.send(self.global_config_watch.borrow().clone());
Expand Down Expand Up @@ -231,9 +258,12 @@ impl GlobalConfigService {
let mut success = false;
match config.global {
Some(global_config) => {
// Notifying subscribers only fails when there are no
// subscribers.
TBS1996 marked this conversation as resolved.
Show resolved Hide resolved
self.global_config_watch.send(Arc::new(global_config)).ok();
// Log the first time we receive a global config from upstream.
if !self.global_config_watch.borrow().is_ready() {
relay_log::info!("received global config from upstream");
}
self.global_config_watch
.send_replace(Status::Ready(Arc::new(global_config)));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we replace send with send_replace because in case theres not yet a receiver send_replace will populate the receiver when it gets constructed later.

success = true;
self.last_fetched = Instant::now();
}
Expand Down Expand Up @@ -277,24 +307,29 @@ impl Service for GlobalConfigService {

relay_log::info!("global config service starting");
if self.config.relay_mode() == RelayMode::Managed {
relay_log::info!("serving global configs fetched from upstream");
relay_log::info!("requesting global config from upstream");
self.request_global_config();
} else {
match GlobalConfig::load(self.config.path()) {
Ok(Some(from_file)) => {
relay_log::info!("serving static global config loaded from file");
self.global_config_watch.send(Arc::new(from_file)).ok();
self.global_config_watch
.send_replace(Status::Ready(Arc::new(from_file)));
}
Ok(None) => {
relay_log::info!(
"serving default global configs due to lacking static global config file"
);
self.global_config_watch
.send_replace(Status::Ready(Arc::default()));
}
Err(e) => {
relay_log::error!("failed to load global config from file: {}", e);
relay_log::info!(
"serving default global configs due to failure to load global config from file"
);
self.global_config_watch
.send_replace(Status::Ready(Arc::default()));
}
}
};
Expand Down
42 changes: 16 additions & 26 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ use {
};

use crate::actors::envelopes::{EnvelopeManager, SendEnvelope, SendEnvelopeError, SubmitEnvelope};
use crate::actors::global_config::{GlobalConfigManager, Subscribe};
use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::actors::project::ProjectState;
use crate::actors::project_cache::ProjectCache;
Expand Down Expand Up @@ -319,6 +318,9 @@ struct ProcessEnvelopeState<'a> {

/// Reservoir evaluator that we use for dynamic sampling.
reservoir: ReservoirEvaluator<'a>,

/// Global config used for envelope processing.
global_config: Arc<GlobalConfig>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved global config from the envelopeprocessor to the processorstate

}

impl<'a> ProcessEnvelopeState<'a> {
Expand Down Expand Up @@ -438,6 +440,7 @@ pub struct ProcessEnvelope {
pub project_state: Arc<ProjectState>,
pub sampling_project_state: Option<Arc<ProjectState>>,
pub reservoir_counters: ReservoirCounters,
pub global_config: Arc<GlobalConfig>,
}

/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator.
Expand Down Expand Up @@ -539,7 +542,6 @@ impl FromMessage<RateLimitBuckets> for EnvelopeProcessor {
/// This service handles messages in a worker pool with configurable concurrency.
#[derive(Clone)]
pub struct EnvelopeProcessorService {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that 'inner' is irrelevant, i'll remove it in following pr

global_config: Arc<GlobalConfig>,
inner: Arc<InnerProcessor>,
}

Expand All @@ -549,7 +551,6 @@ struct InnerProcessor {
redis_pool: Option<RedisPool>,
envelope_manager: Addr<EnvelopeManager>,
project_cache: Addr<ProjectCache>,
global_config: Addr<GlobalConfigManager>,
outcome_aggregator: Addr<TrackOutcome>,
#[cfg(feature = "processing")]
aggregator: Addr<Aggregator>,
Expand All @@ -568,7 +569,6 @@ impl EnvelopeProcessorService {
envelope_manager: Addr<EnvelopeManager>,
outcome_aggregator: Addr<TrackOutcome>,
project_cache: Addr<ProjectCache>,
global_config: Addr<GlobalConfigManager>,
upstream_relay: Addr<UpstreamRelay>,
#[cfg(feature = "processing")] aggregator: Addr<Aggregator>,
) -> Self {
Expand All @@ -591,7 +591,6 @@ impl EnvelopeProcessorService {
config,
envelope_manager,
project_cache,
global_config,
outcome_aggregator,
upstream_relay,
geoip_lookup,
Expand All @@ -600,7 +599,6 @@ impl EnvelopeProcessorService {
};

Self {
global_config: Arc::default(),
inner: Arc::new(inner),
}
}
Expand Down Expand Up @@ -1354,6 +1352,7 @@ impl EnvelopeProcessorService {
project_state,
sampling_project_state,
reservoir_counters,
global_config,
} = message;

let envelope = managed_envelope.envelope_mut();
Expand Down Expand Up @@ -1408,6 +1407,7 @@ impl EnvelopeProcessorService {
managed_envelope,
profile_id: None,
reservoir,
global_config,
})
}

Expand Down Expand Up @@ -2731,7 +2731,7 @@ impl EnvelopeProcessorService {
enable_trimming: true,
measurements: Some(DynamicMeasurementsConfig::new(
state.project_state.config().measurements.as_ref(),
self.global_config.measurements.as_ref(),
state.global_config.measurements.as_ref(),
)),
};

Expand Down Expand Up @@ -3091,25 +3091,13 @@ impl EnvelopeProcessorService {
impl Service for EnvelopeProcessorService {
type Interface = EnvelopeProcessor;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
let thread_count = self.inner.config.cpu_concurrency();
relay_log::info!("starting {thread_count} envelope processing workers");

tokio::spawn(async move {
let semaphore = Arc::new(Semaphore::new(thread_count));

let Ok(mut subscription) = self.inner.global_config.send(Subscribe).await else {
// TODO(iker): we accept this sub-optimal error handling. TBD
// the approach to deal with failures on the subscription
// mechanism.
relay_log::error!("failed to subscribe to GlobalConfigService");
return;
};

// In case we use static global config, the watch wont be updated repeatedly, so we
// should immediatly use the content of the watch.
self.global_config = subscription.borrow().clone();

loop {
let next_msg = async {
let permit_result = semaphore.clone().acquire_owned().await;
Expand All @@ -3122,7 +3110,6 @@ impl Service for EnvelopeProcessorService {
tokio::select! {
biased;

Ok(()) = subscription.changed() => self.global_config = subscription.borrow().clone(),
(Some(message), Ok(permit)) = next_msg => {
let service = self.clone();
tokio::task::spawn_blocking(move || {
Expand Down Expand Up @@ -3393,6 +3380,7 @@ mod tests {
profile_id: None,
event_metrics_extracted: false,
reservoir: dummy_reservoir(),
global_config: Arc::default(),
}
};

Expand Down Expand Up @@ -3574,7 +3562,6 @@ mod tests {
let (outcome_aggregator, _) = mock_service("outcome_aggregator", (), |&mut (), _| {});
let (project_cache, _) = mock_service("project_cache", (), |&mut (), _| {});
let (upstream_relay, _) = mock_service("upstream_relay", (), |&mut (), _| {});
let (global_config, _) = mock_service("global_config", (), |&mut (), _| {});
#[cfg(feature = "processing")]
let (aggregator, _) = mock_service("aggregator", (), |&mut (), _| {});
let inner = InnerProcessor {
Expand All @@ -3588,13 +3575,11 @@ mod tests {
#[cfg(feature = "processing")]
redis_pool: None,
geoip_lookup: None,
global_config,
#[cfg(feature = "processing")]
aggregator,
};

EnvelopeProcessorService {
global_config: Arc::default(),
inner: Arc::new(inner),
}
}
Expand All @@ -3612,13 +3597,11 @@ mod tests {
#[cfg(feature = "processing")]
redis_pool: None,
geoip_lookup: None,
global_config: Addr::dummy(),
#[cfg(feature = "processing")]
aggregator: Addr::dummy(),
};

EnvelopeProcessorService {
global_config: Arc::default(),
inner: Arc::new(inner),
}
}
Expand Down Expand Up @@ -3653,6 +3636,7 @@ mod tests {
project_state: Arc::new(ProjectState::allowed()),
sampling_project_state: None,
reservoir_counters: ReservoirCounters::default(),
global_config: Arc::default(),
};

let envelope_response = processor.process(message).unwrap();
Expand All @@ -3675,6 +3659,7 @@ mod tests {
project_state: Arc::new(ProjectState::allowed()),
sampling_project_state,
reservoir_counters: ReservoirCounters::default(),
global_config: Arc::default(),
};

let envelope_response = processor.process(message).unwrap();
Expand Down Expand Up @@ -3886,6 +3871,7 @@ mod tests {
project_state: Arc::new(project_state),
sampling_project_state: None,
reservoir_counters: ReservoirCounters::default(),
global_config: Arc::default(),
};

let envelope_response = processor.process(message).unwrap();
Expand Down Expand Up @@ -3957,6 +3943,7 @@ mod tests {
project_state: Arc::new(ProjectState::allowed()),
sampling_project_state: None,
reservoir_counters: ReservoirCounters::default(),
global_config: Arc::default(),
};

let envelope_response = processor.process(message).unwrap();
Expand Down Expand Up @@ -4006,6 +3993,7 @@ mod tests {
project_state: Arc::new(ProjectState::allowed()),
sampling_project_state: None,
reservoir_counters: ReservoirCounters::default(),
global_config: Arc::default(),
};

let envelope_response = processor.process(message).unwrap();
Expand Down Expand Up @@ -4063,6 +4051,7 @@ mod tests {
project_state: Arc::new(ProjectState::allowed()),
sampling_project_state: None,
reservoir_counters: ReservoirCounters::default(),
global_config: Arc::default(),
};

let envelope_response = processor.process(message).unwrap();
Expand Down Expand Up @@ -4551,6 +4540,7 @@ mod tests {
project_state: Arc::new(project_state),
sampling_project_state: None,
reservoir_counters: ReservoirCounters::default(),
global_config: Arc::default(),
};

let envelope_response = processor.process(message).unwrap();
Expand Down
Loading
Loading