Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(server): Simplify global config subscription #4004

Merged
merged 3 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ impl ServiceState {
.start();
let outcome_aggregator = OutcomeAggregator::new(&config, outcome_producer.clone()).start();

let global_config = GlobalConfigService::new(config.clone(), upstream_relay.clone());
let (global_config, global_config_rx) =
GlobalConfigService::new(config.clone(), upstream_relay.clone());
let global_config_handle = global_config.handle();
// The global config service must start before dependant services are
// started. Messages like subscription requests to the global config
Expand Down Expand Up @@ -256,13 +257,13 @@ impl ServiceState {
project_cache: project_cache.clone(),
test_store: test_store.clone(),
upstream_relay: upstream_relay.clone(),
global_config: global_config.clone(),
};

ProjectCacheService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
project_cache_services,
global_config_rx,
redis_pools
.as_ref()
.map(|pools| pools.project_configs.clone()),
Expand Down
76 changes: 30 additions & 46 deletions relay-server/src/services/global_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,27 +104,14 @@ impl UpstreamQuery for GetGlobalConfig {
/// The message for requesting the most recent global config from [`GlobalConfigService`].
pub struct Get;

/// The message for receiving a watch that subscribes to the [`GlobalConfigService`].
///
/// The global config service must be up and running, else the subscription
/// fails. Subscribers should use the initial value when they get the watch
/// rather than only waiting for the watch to update, in case a global config
/// is only updated once, such as is the case with the static config file.
pub struct Subscribe;

/// An interface to get [`GlobalConfig`]s through [`GlobalConfigService`].
///
/// For a one-off update, [`GlobalConfigService`] responds to
/// [`GlobalConfigManager::Get`] messages with the latest instance of the
/// [`GlobalConfig`]. For continued updates, you can subscribe with
/// [`GlobalConfigManager::Subscribe`] to get a receiver back where up-to-date
/// instances will be sent to, while [`GlobalConfigService`] manages the update
/// frequency from upstream.
/// [`GlobalConfig`].
pub enum GlobalConfigManager {
/// Returns the most recent global config.
Get(relay_system::Sender<Status>),
/// Returns a [`watch::Receiver`] where global config updates will be sent to.
Subscribe(relay_system::Sender<watch::Receiver<Status>>),
}

impl Interface for GlobalConfigManager {}
Expand All @@ -137,14 +124,6 @@ impl FromMessage<Get> for GlobalConfigManager {
}
}

impl FromMessage<Subscribe> for GlobalConfigManager {
type Response = AsyncResponse<watch::Receiver<Status>>;

fn from_message(_: Subscribe, sender: relay_system::Sender<watch::Receiver<Status>>) -> Self {
Self::Subscribe(sender)
}
}

/// Describes the current fetching status of the [`GlobalConfig`] from the upstream.
#[derive(Debug, Clone, Default)]
pub enum Status {
Expand Down Expand Up @@ -201,10 +180,6 @@ impl fmt::Debug for GlobalConfigHandle {
}

/// Service implementing the [`GlobalConfigManager`] interface.
///
/// The service offers two alternatives to fetch the [`GlobalConfig`]:
/// responding to a [`Get`] message with the config for one-off requests, or
/// subscribing to updates with [`Subscribe`] to keep up-to-date.
#[derive(Debug)]
pub struct GlobalConfigService {
config: Arc<Config>,
Expand All @@ -228,21 +203,27 @@ pub struct GlobalConfigService {

impl GlobalConfigService {
/// Creates a new [`GlobalConfigService`].
pub fn new(config: Arc<Config>, upstream: Addr<UpstreamRelay>) -> Self {
pub fn new(
config: Arc<Config>,
upstream: Addr<UpstreamRelay>,
) -> (Self, watch::Receiver<Status>) {
let (internal_tx, internal_rx) = mpsc::channel(1);
let (global_config_watch, _) = watch::channel(Status::Pending);

Self {
config,
global_config_watch,
internal_tx,
internal_rx,
upstream,
fetch_handle: SleepHandle::idle(),
last_fetched: Instant::now(),
upstream_failure_interval: Duration::from_secs(35),
shutdown: false,
}
let (global_config_watch, rx) = watch::channel(Status::Pending);

(
Self {
config,
global_config_watch,
internal_tx,
internal_rx,
upstream,
fetch_handle: SleepHandle::idle(),
last_fetched: Instant::now(),
upstream_failure_interval: Duration::from_secs(35),
shutdown: false,
},
rx,
)
}

/// Creates a [`GlobalConfigHandle`] which can be used to retrieve the current state
Expand All @@ -259,9 +240,6 @@ impl GlobalConfigService {
GlobalConfigManager::Get(sender) => {
sender.send(self.global_config_watch.borrow().clone());
}
GlobalConfigManager::Subscribe(sender) => {
sender.send(self.global_config_watch.subscribe());
}
}
}

Expand Down Expand Up @@ -440,7 +418,9 @@ mod tests {
config.regenerate_credentials(false).unwrap();
let fetch_interval = config.global_config_fetch_interval();

let service = GlobalConfigService::new(Arc::new(config), upstream).start();
let service = GlobalConfigService::new(Arc::new(config), upstream)
.0
.start();

assert!(service.send(Get).await.is_ok());

Expand Down Expand Up @@ -469,7 +449,9 @@ mod tests {
config.regenerate_credentials(false).unwrap();

let fetch_interval = config.global_config_fetch_interval();
let service = GlobalConfigService::new(Arc::new(config), upstream).start();
let service = GlobalConfigService::new(Arc::new(config), upstream)
.0
.start();
service.send(Get).await.unwrap();

tokio::time::sleep(fetch_interval * 2).await;
Expand All @@ -494,7 +476,9 @@ mod tests {

let fetch_interval = config.global_config_fetch_interval();

let service = GlobalConfigService::new(Arc::new(config), upstream).start();
let service = GlobalConfigService::new(Arc::new(config), upstream)
.0
.start();
service.send(Get).await.unwrap();

tokio::time::sleep(fetch_interval * 2).await;
Expand Down
25 changes: 9 additions & 16 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Duration;

use crate::extractors::RequestMeta;
use crate::services::buffer::{EnvelopeBuffer, EnvelopeBufferError};
use crate::services::global_config;
use crate::services::processor::{
EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProcessingGroup, ProjectMetrics,
};
Expand All @@ -19,12 +20,11 @@ use relay_quotas::RateLimits;
use relay_redis::RedisPool;
use relay_statsd::metric;
use relay_system::{Addr, FromMessage, Interface, Sender, Service};
use tokio::sync::mpsc;
#[cfg(feature = "processing")]
use tokio::sync::Semaphore;
use tokio::sync::{mpsc, watch};
use tokio::time::Instant;

use crate::services::global_config::{self, GlobalConfigManager, Subscribe};
use crate::services::metrics::{Aggregator, FlushBuckets};
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::project::{Project, ProjectFetchState, ProjectSender, ProjectState};
Expand Down Expand Up @@ -585,7 +585,6 @@ pub struct Services {
pub project_cache: Addr<ProjectCache>,
pub test_store: Addr<TestStore>,
pub upstream_relay: Addr<UpstreamRelay>,
pub global_config: Addr<GlobalConfigManager>,
}

/// Main broker of the [`ProjectCacheService`].
Expand Down Expand Up @@ -1345,6 +1344,7 @@ pub struct ProjectCacheService {
config: Arc<Config>,
memory_checker: MemoryChecker,
services: Services,
global_config_rx: watch::Receiver<global_config::Status>,
redis: Option<RedisPool>,
}

Expand All @@ -1354,12 +1354,14 @@ impl ProjectCacheService {
config: Arc<Config>,
memory_checker: MemoryChecker,
services: Services,
global_config_rx: watch::Receiver<global_config::Status>,
redis: Option<RedisPool>,
) -> Self {
Self {
config,
memory_checker,
services,
global_config_rx,
redis,
}
}
Expand All @@ -1373,6 +1375,7 @@ impl Service for ProjectCacheService {
config,
memory_checker,
services,
mut global_config_rx,
redis,
} = self;
let project_cache = services.project_cache.clone();
Expand All @@ -1386,15 +1389,7 @@ impl Service for ProjectCacheService {
// Channel for async project state responses back into the project cache.
let (state_tx, mut state_rx) = mpsc::unbounded_channel();

let Ok(mut subscription) = services.global_config.send(Subscribe).await else {
// TODO(iker): we accept this sub-optimal error handling. TBD
// the approach to deal with failures on the subscription
// mechanism.
relay_log::error!("failed to subscribe to GlobalConfigService");
return;
};

let global_config = match subscription.borrow().clone() {
let global_config = match global_config_rx.borrow().clone() {
global_config::Status::Ready(_) => {
relay_log::info!("global config received");
GlobalConfigStatus::Ready
Expand Down Expand Up @@ -1469,9 +1464,9 @@ impl Service for ProjectCacheService {
tokio::select! {
biased;

Ok(()) = subscription.changed() => {
Ok(()) = global_config_rx.changed() => {
metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "update_global_config", {
match subscription.borrow().clone() {
match global_config_rx.borrow().clone() {
global_config::Status::Ready(_) => broker.set_global_config_ready(),
// The watch should only be updated if it gets a new value.
// This would imply a logical bug.
Expand Down Expand Up @@ -1591,7 +1586,6 @@ mod tests {
let (project_cache, _) = mock_service("project_cache", (), |&mut (), _| {});
let (test_store, _) = mock_service("test_store", (), |&mut (), _| {});
let (upstream_relay, _) = mock_service("upstream_relay", (), |&mut (), _| {});
let (global_config, _) = mock_service("global_config", (), |&mut (), _| {});

Services {
envelope_buffer: None,
Expand All @@ -1601,7 +1595,6 @@ mod tests {
outcome_aggregator,
test_store,
upstream_relay,
global_config,
}
}

Expand Down
Loading